#!conda listCASD : Extraction d’agrégats
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"year = "2019"
# year = "2019"
# year = "2018"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data/"
OUT_PATH = OUT_PATH + "assiettes_pote_brutes_" + year + "-chunk/"
taille_chunk = 2 * 2**20 # 2**20 = 1_048_576import leximpact_prepare_data
leximpact_prepare_data.__version__'0.0.23'
import gc
import pandas as pd
import vaex
from tqdm import tqdm
from leximpact_prepare_data.scenario_tools.calib_and_copules import *# Temps de chargement 8 secondes pour 39,264,695 lignes, vive la lazy loading !
dfv = vaex.open(OUT_PATH + "*")
# dfv = vaex.open(ARROW_PATH + "pote_brutes_2019_5.arrow")
# dfvCPU times: total: 375 ms
Wall time: 372 ms
# Temps d'exécution : 2 secondes
# pyramide_des_ages = dfv.groupby(by="aged", agg={"age": vaex.agg.count("aged")})
# pyramide_des_ages# dfv.info()# tc.assertEqual(dfv["revkire"].count(), 39512402) # 2019
tc.assertEqual(dfv["revkire"].count(), 39818227) # 2020dfv.get_column_names()
# Remove id fip18_c
_ = dfv.drop("fip18_c", inplace=True)['rnsgbd',
'rnsgld',
'revkire',
'z1aj',
'z1ap',
'z1as',
'z1bj',
'z1bp',
'z1bs',
'z1cj',
'z1cw',
'z1dw',
'z2ch',
'z2dc',
'z2dh',
'z2tr',
'z3ua',
'z3vg',
'z3vz',
'z4ba',
'z4bb',
'z4bc',
'z4bd',
'z4be',
'z6de',
'z8sc',
'z8sw',
'z8sx',
'cics',
'mnimqg',
'fip18_c',
'revenus_capitaux_prelevement_bareme',
'revenus_capitaux_prelevement_liberatoire',
'revenus_capitaux_prelevement_forfaitaire_unique_ir',
'rente_viagere_titre_onereux_net',
'revenu_categoriel_foncier',
'assiette_csg_plus_values',
'assiette_csg_revenus_capital',
'retraites',
'chomage_et_indemnites',
'rev_salaire']
Variables continues
# "Z1ak Z1bk txmoy impot impotnet rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi Z1az Z1bz".split(" ")# continuous_variables = dfv.get_column_names()
# continuous_variables = [c.lower() for c in continuous_variables]
continuous_variables = [
"revkire",
"revkire_par_part",
"rbg",
"rnirp8",
"assiette_csg_revenus_capital",
"revenus_capitaux_prelevement_bareme",
"revenus_capitaux_prelevement_liberatoire",
"revenus_capitaux_prelevement_forfaitaire_unique_ir",
"rente_viagere_titre_onereux_net",
"revenu_categoriel_foncier",
"assiette_csg_plus_values",
"revenus_individuels",
"revenus_individuels_par_part",
"rev_salaire",
"retraites",
"chomage_et_indemnites",
"rag",
"ric",
"rnc",
"pension_invalidite",
"pension_alimentaire",
]Calcul d’agregats
def compute_agg(vdf, columns):
sub_total = []
vdf.fillna(column_names=columns, value=0, inplace=True)
# vdf.fillnan(column_names=columns, value=0, inplace=True)
ldf = vdf.shape[0]
columns = columns if columns else vdf.get_column_names()
for col in tqdm(columns):
# print(col)
name = f"{col}_non_zero"
vdf.select(f"{col} != 0", name=name)
nb_no_zero = int(vdf.count("*", selection=name))
lenzero = ldf - nb_no_zero
dict_col = {
"name": col,
"nb_line": ldf,
"lenzero": lenzero,
"pct_zero": lenzero / ldf * 100,
"sum": int(vdf.sum(col)),
"mean": float(vdf.mean(col, selection=name)) if nb_no_zero > 0 else 0.0,
"variance": float(vdf.var(col, selection=name)) if nb_no_zero > 0 else 0.0,
"std_dev": float(vdf.std(col, selection=name)) if nb_no_zero > 0 else 0.0,
}
sub_total.append(dict_col)
return pd.DataFrame(sub_total)# Temps sur CASD : 30s par colonne avant l'upgrade de machine, moins de 3 secondes après upgrade !
df_agg = compute_agg(dfv, continuous_variables)# dfv.mnipeg.mean()pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df_agg.to_csv(OUT_PATH + "/agregats_POTE_revenus_rici_" + year + ".csv", index=False)
df_aggdel df_agg
import gc
gc.collect()Extraction de fonctions de répartition (pour calibration)
dfv = dfv.fillna(0)CPU times: total: 23.4 s
Wall time: 23.2 s
# calib = get_calib(dfv, "rimp", 10)CPU times: total: 0 ns
Wall time: 0 ns
# Redéfinition à migrer !
from typing import Dict
def get_primary_buckets(
vdx_sort: vaex.dataframe.DataFrameLocal,
nb_bucket: int,
variable_to_split_on: str = "revkire",
minimal_bucket_size=12,
debug=False,
) -> Dict:
"""
Objectif: Split the variable in buckets
Dans chaque bucket on stocke toutes les valeurs non nulles de "variable"
::vdx_sort:: Le dataset, trié selon la variable à étudier
::nb_bucket:: Nombre de tranches souhaitées
::variable_to_split_on:: Variable on which to split buckets
::debug:: Pour activer un mode debug, qui affiche des traces
"""
dataset_size = vdx_sort.shape[0] # Nb de lignes
# Conversion en array
variable_array = vdx_sort.to_arrays(
column_names=[variable_to_split_on], selection=False, array_type="python"
)[0]
# On vérifie que le dataset est bien trié
previous = variable_array[-1]
for i in range(1, 1000):
idx = dataset_size // i
idx = idx if idx != dataset_size else dataset_size - 1
if previous < variable_array[idx]:
raise DatasetNotSorted(
f"Your dataset is not sorted on {variable_to_split_on}!"
)
previous = variable_array[idx]
# Découpage du RFR en buckets:
borders = get_borders(
dataset_size=dataset_size,
nb_bucket=nb_bucket,
minimal_bucket_size=minimal_bucket_size,
debug=debug,
)
# On retire la dernière frontière pour éviter des tests (index out of range), on la remetra après
borders = borders[:-1]
i = 0
# On supprime les frontières qui n'auraient que du 0
while i < len(borders):
if variable_array[borders[i]] < 1:
if debug:
print(
f"WARNING: On efface la frontière d'index {i} : {borders[i]} inutile car valeur de la borne haute est {variable_array[borders[i]]}"
)
borders = borders[:i] + borders[i + 1 :]
else:
i += 1
frontieres_valeurs = [0] + [variable_array[frontiere] for frontiere in borders]
# On ajoute une valeur de fin trés haute (10^15€)
frontieres_valeurs += [10**15]
# On remet la dernière frontière
borders += [dataset_size]
dic = {"borders_values": frontieres_valeurs, "borders": borders}
del variable_array
gc.collect()
return dic# TODO : import from package
# def get_fake_upper_bound(val):
# if val == 1e15:
# return 1e15
# else:
# return 10 ** (len(str(int(val))))# get_fake_upper_bound(100.5)# calib = get_copulas(dfv, "revkire", "revkire", 10, une_tranche_rfr)# calib["copules"]["buckets"][-1]# calib = calib["copules"][0]["buckets"]
# keep_upper_bound_secret(calib)# calib# calib["buckets"][-1]["seuil_var_supp"] = "secret"
# calib# from IPython.display import JSON
# JSON(calib)# %%time
# # Temps sur CASD : 138s par iteration
# nb_bucket_var = 100
# for variable in tqdm(continuous_variables):
# #calib = get_calib(dfv, variable, nb_bucket_var)
# # print(variable)
# calib = compute_copule_vaex(dfv, variable, nb_bucket_var, une_tranche_rfr)
# calib["copules"][0]["buckets"][-1]["seuil_var_supp"] = "secret"
# with open(f"{OUT_PATH}CalibPote-{nb_bucket_var}-{year}-{variable}.json", "w") as f:
# f.write(json.dumps(calib["copules"][0]["buckets"]))# %%time
# # Temps sur CASD : 538s par iteration !
# nb_bucket_var = 1000
# for variable in tqdm(continuous_variables):
# #calib = get_calib(dfv, variable, nb_bucket_var)
# # print(variable)
# calib = compute_copule_vaex(dfv, variable, nb_bucket_var, une_tranche_rfr)
# calib["copules"][0]["buckets"][-1]["seuil_var_supp"] = "secret"
# with open(f"{OUT_PATH}CalibPote-{nb_bucket_var}-{year}-{variable}.json", "w") as f:
# f.write(json.dumps(calib["copules"][0]["buckets"]))Extraction de Copules
nb_bucket_var = 10
# on fait des copules en fonction du rfr mais aussi en fonction des revenus individuels pour voir si ça permet d'améliorer l'imputation
# Les copules en fonction de assiette_csg_revenus_capital servent si on veut voir la distribution des différents revenus du capital dans la somme de revenus du capital
for copule_var in ["revkire", "revkire_par_part", "revenus_individuels", "revenus_individuels_par_part", "assiette_csg_revenus_capital"]
centile = get_primary_buckets(
dfv, nb_bucket_var, variable_to_split_on=copule_var, minimal_bucket_size=500
)
for variable in tqdm(continuous_variables): # continuous_variables
try:
copule = get_copulas(
dfv,
copule_var,
variable,
nb_bucket_var,
centile_rfr,
minimal_bucket_size=100,
)
# copule["copules"][0]["buckets"][-1]["upper_bound"] = "secret"
anonimyze_lower_and_upper_bound(copule["copules"])
with open(
f"{OUT_PATH}CopulePote-{nb_bucket_var}-{year}-{copule_var}-{variable}.json", "w"
) as f:
f.write(json.dumps(copule))
except Exception as e:
print(f"ERROR processing {variable}", e)
# raise e100%|███████████████████████████████████████████| 3/3 [07:55<00:00, 158.59s/it]
CPU times: total: 7min 54s
Wall time: 7min 55s
# dfv.column_names# copule = get_copulas(dfv, "revkire", variable, nb_bucket_var, centile_rfr)