from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"CASD : Extraction de quantiles de POTE
year = "2020"
# year = "2019"
# year = "2018"
PARQUET_PATH = r"C:\Users\Public\Documents\TRAVAIL\pote_brut\per_pote_2020/"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data_2023/"
taille_chunk = 2 * 2**20 # 2**20 = 1_048_576
# taille_chunk = 5000import leximpact_prepare_data
leximpact_prepare_data.__version__'0.0.35'
import gc
import json
import pandas as pd
import vaex
from tqdm import tqdm
from leximpact_prepare_data.scenario_tools.calib_and_copules import *# Temps de chargement 8 secondes pour 39,264,695 lignes, vive le lazy loading !
dfv = vaex.open(PARQUET_PATH + "*")
# dfv = vaex.open(ARROW_PATH + "pote_brutes_2019_5.arrow")
# dfv
# tc.assertEqual(len(dfv), 39264696)CPU times: total: 484 ms
Wall time: 3.41 s
Variables continues
continuous_variables = "revkire Z6ns Z6nt Z6nu Z6rs Z6rt Z6ru Z6ps Z6pt Z6pu".split(" ")
continuous_variables = [c.lower() for c in continuous_variables]Calcul des agrégats
def compute_agg(vdf, columns):
sub_total = []
vdf.fillna(column_names=columns, value=0, inplace=True)
# vdf.fillnan(column_names=columns, value=0, inplace=True)
ldf = vdf.shape[0]
columns = columns if columns else vdf.get_column_names()
for col in tqdm(columns):
# print(col)
name = f"{col}_non_zero"
vdf.select(f"{col} != 0", name=name)
nb_no_zero = int(vdf.count("*", selection=name))
lenzero = ldf - nb_no_zero
dict_col = {
"name": col,
"nb_line": ldf,
"lenzero": lenzero,
"pct_zero": lenzero / ldf * 100,
"sum": int(vdf.sum(col)),
"mean": float(vdf.mean(col, selection=name)) if nb_no_zero > 0 else 0.0,
"variance": float(vdf.var(col, selection=name)) if nb_no_zero > 0 else 0.0,
"std_dev": float(vdf.std(col, selection=name)) if nb_no_zero > 0 else 0.0,
}
sub_total.append(dict_col)
return pd.DataFrame(sub_total)df_agg = compute_agg(dfv, continuous_variables)100%|██████████████████████████████████████████| 10/10 [00:23<00:00, 2.37s/it]
CPU times: total: 55.9 s
Wall time: 23.7 s
pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df_agg.to_csv(OUT_PATH + "/agregats_POTE_per_" + year + ".csv", index=False)
df_agg| name | nb_line | lenzero | pct_zero | sum | mean | variance | std_dev | |
|---|---|---|---|---|---|---|---|---|
| 0 | revkire | 39818227 | 2789455 | 7.005472644475104 | 1103673567589 | 29,805.83767641552 | 8,789,585,767.879045 | 93,752.79072010504 |
| 1 | z6ns | 39818227 | 39469032 | 99.12302725081155 | 1520652349 | 4,354.736891994445 | 4,344,848,231.190162 | 65,915.46276246691 |
| 2 | z6nt | 39818227 | 39680906 | 99.65513030000055 | 551681175 | 4,017.4567254826284 | 53,306,402.075728334 | 7,301.123343412871 |
| 3 | z6nu | 39818227 | 39815493 | 99.99313379774544 | 682265170 | 249,548.34308705194 | 162,499,798,509,408.2 | 12,747,540.880868286 |
| 4 | z6rs | 39818227 | 39217415 | 98.49111312766388 | 2018343140 | 3,359.3589009540424 | 604,944,065,969.981 | 777,781.5027178172 |
| 5 | z6rt | 39818227 | 39538656 | 99.29788184692401 | 581164111 | 2,078.7710849837786 | 19,249,465.058399823 | 4,387.421231019404 |
| 6 | z6ru | 39818227 | 39815579 | 99.99334977923553 | 16374062 | 6,183.558157099698 | 39,464,198.75794705 | 6,282.053705433204 |
| 7 | z6ps | 39818227 | 39808934 | 99.97666144200745 | 120841371 | 13,003.48337458302 | 124,057,915.09423018 | 11,138.128886587288 |
| 8 | z6pt | 39818227 | 39814742 | 99.9912477268262 | 41885154 | 12,018.695552367288 | 63,592,864.15838769 | 7,974.513412013782 |
| 9 | z6pu | 39818227 | 39817621 | 99.99847808391871 | 6629966 | 10,940.53795379538 | 69,424,337.89542419 | 8,332.126853056438 |
# Libère la mémoire
del df_agg
import gc
gc.collect()14360
Calcul des quantiles
def compute_quantile(vdf, columns=None, quantiles=10):
vdf.fillna(column_names=columns, value=0, inplace=True)
# vdf.fillnan(column_names=columns, value=0, inplace=True)
vdf.shape[0]
columns = columns if columns else vdf.get_column_names()
for col in tqdm(columns):
try:
# print(col)
q = Quantile(vdf[col].tolist())
for quantile in quantiles:
q_dict = q.get_quantile(quantile)
anonimyze_lower_and_upper_bound(q_dict)
with open(
f"{OUT_PATH}/quantile_POTE_{quantile}_{year}_{col}.json", "w"
) as f:
f.write(json.dumps(q_dict))
del q
gc.collect()
except Exception as e:
print(f"ERROR processing {col} {e.__class__.__name__} : {e}")
continue# Temps sur CASD : 5 minutes par colonne
compute_quantile(dfv, columns=continuous_variables, quantiles=[10]) 40%|████████████████▊ | 4/10 [09:32<13:37, 136.28s/it]
ERROR processing z6nu SecretViolation : Quantile : ERROR SECRET STATISTIQUE > 0.85 NON RESPECTE (ratio=42.739)
100%|█████████████████████████████████████████| 10/10 [22:36<00:00, 135.61s/it]
CPU times: total: 22min 39s
Wall time: 22min 36s
Calcul des copules
continuous_variables['revkire',
'z6ns',
'z6nt',
'z6nu',
'z6rs',
'z6rt',
'z6ru',
'z6ps',
'z6pt',
'z6pu']
nb_bucket_var = 10
# on fait des copules en fonction du rfr
for copule_var in ["revkire"]:
dfv = dfv.sort(copule_var)
centile = get_primary_buckets(
dfv, nb_bucket_var, variable_to_split_on=copule_var, minimal_bucket_size=12
)
for variable in tqdm(continuous_variables):
if variable != "revkire":
try:
copule = get_copulas(
dfv,
copule_var,
variable,
nb_bucket_var,
centile,
minimal_bucket_size=1000,
debug=False,
)
anonimyze_lower_and_upper_bound(copule["copules"])
with open(
f"{OUT_PATH}CopulePote-{nb_bucket_var}-{year}-{copule_var}-{variable}.json",
"w",
) as f:
f.write(json.dumps(copule))
except Exception as e:
print(f"ERROR processing {variable}", e)
# raise e 20%|████████▌ | 2/10 [00:48<03:14, 24.34s/it]
ERROR processing z6ns SECRET STATISTIQUE > 0.85 NON RESPECTE
DistribDeVar : less than 1000 for non_zero elements. 3981431 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981504 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981521 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981466 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981220 elements at 0
30%|████████████▉ | 3/10 [01:38<04:05, 35.05s/it]
for d in dfv.columns:
print(d)rnirp8
rbg
__revkire
__MCIRRD
__MNIMXG
__RNIMJW
__revkire_par_part
__assiette_csg_revenus_capital
__revenus_capitaux_prelevement_bareme
__revenus_capitaux_prelevement_liberatoire
__revenus_capitaux_prelevement_forfaitaire_unique_ir
__rente_viagere_titre_onereux_net
__revenu_categoriel_foncier
__assiette_csg_plus_values
__revenus_individuels
__revenus_individuels_par_part
__rev_salaire
__retraites
__rpns_imposables
__chomage_et_indemnites
__pension_invalidite
__pension_alimentaire
del dfv
gc.collect()0