CASD : Extraction de quantiles de POTE

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
year = "2020"
# year = "2019"
# year = "2018"
PARQUET_PATH = r"C:\Users\Public\Documents\TRAVAIL\pote_brut\per_pote_2020/"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data_2023/"

taille_chunk = 2 * 2**20  # 2**20 = 1_048_576
# taille_chunk = 5000
import leximpact_prepare_data

leximpact_prepare_data.__version__
'0.0.35'
import gc
import json
import pandas as pd
import vaex
from tqdm import tqdm

from leximpact_prepare_data.scenario_tools.calib_and_copules import *
# Temps de chargement 8 secondes pour 39,264,695 lignes, vive le lazy loading !
dfv = vaex.open(PARQUET_PATH + "*")
# dfv = vaex.open(ARROW_PATH + "pote_brutes_2019_5.arrow")
# dfv
# tc.assertEqual(len(dfv), 39264696)
CPU times: total: 484 ms
Wall time: 3.41 s

Variables continues

continuous_variables = "revkire Z6ns Z6nt Z6nu Z6rs Z6rt Z6ru Z6ps Z6pt Z6pu".split(" ")
continuous_variables = [c.lower() for c in continuous_variables]

Calcul des agrégats

def compute_agg(vdf, columns):
    sub_total = []
    vdf.fillna(column_names=columns, value=0, inplace=True)
    # vdf.fillnan(column_names=columns, value=0, inplace=True)
    ldf = vdf.shape[0]
    columns = columns if columns else vdf.get_column_names()
    for col in tqdm(columns):
        # print(col)
        name = f"{col}_non_zero"
        vdf.select(f"{col} != 0", name=name)
        nb_no_zero = int(vdf.count("*", selection=name))
        lenzero = ldf - nb_no_zero
        dict_col = {
            "name": col,
            "nb_line": ldf,
            "lenzero": lenzero,
            "pct_zero": lenzero / ldf * 100,
            "sum": int(vdf.sum(col)),
            "mean": float(vdf.mean(col, selection=name)) if nb_no_zero > 0 else 0.0,
            "variance": float(vdf.var(col, selection=name)) if nb_no_zero > 0 else 0.0,
            "std_dev": float(vdf.std(col, selection=name)) if nb_no_zero > 0 else 0.0,
        }
        sub_total.append(dict_col)
    return pd.DataFrame(sub_total)
df_agg = compute_agg(dfv, continuous_variables)
100%|██████████████████████████████████████████| 10/10 [00:23<00:00,  2.37s/it]
CPU times: total: 55.9 s
Wall time: 23.7 s
pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df_agg.to_csv(OUT_PATH + "/agregats_POTE_per_" + year + ".csv", index=False)
df_agg
name nb_line lenzero pct_zero sum mean variance std_dev
0 revkire 39818227 2789455 7.005472644475104 1103673567589 29,805.83767641552 8,789,585,767.879045 93,752.79072010504
1 z6ns 39818227 39469032 99.12302725081155 1520652349 4,354.736891994445 4,344,848,231.190162 65,915.46276246691
2 z6nt 39818227 39680906 99.65513030000055 551681175 4,017.4567254826284 53,306,402.075728334 7,301.123343412871
3 z6nu 39818227 39815493 99.99313379774544 682265170 249,548.34308705194 162,499,798,509,408.2 12,747,540.880868286
4 z6rs 39818227 39217415 98.49111312766388 2018343140 3,359.3589009540424 604,944,065,969.981 777,781.5027178172
5 z6rt 39818227 39538656 99.29788184692401 581164111 2,078.7710849837786 19,249,465.058399823 4,387.421231019404
6 z6ru 39818227 39815579 99.99334977923553 16374062 6,183.558157099698 39,464,198.75794705 6,282.053705433204
7 z6ps 39818227 39808934 99.97666144200745 120841371 13,003.48337458302 124,057,915.09423018 11,138.128886587288
8 z6pt 39818227 39814742 99.9912477268262 41885154 12,018.695552367288 63,592,864.15838769 7,974.513412013782
9 z6pu 39818227 39817621 99.99847808391871 6629966 10,940.53795379538 69,424,337.89542419 8,332.126853056438
# Libère la mémoire
del df_agg
import gc

gc.collect()
14360

Calcul des quantiles

def compute_quantile(vdf, columns=None, quantiles=10):
    vdf.fillna(column_names=columns, value=0, inplace=True)
    # vdf.fillnan(column_names=columns, value=0, inplace=True)
    vdf.shape[0]
    columns = columns if columns else vdf.get_column_names()
    for col in tqdm(columns):
        try:
            # print(col)
            q = Quantile(vdf[col].tolist())
            for quantile in quantiles:
                q_dict = q.get_quantile(quantile)
                anonimyze_lower_and_upper_bound(q_dict)
                with open(
                    f"{OUT_PATH}/quantile_POTE_{quantile}_{year}_{col}.json", "w"
                ) as f:
                    f.write(json.dumps(q_dict))
            del q
            gc.collect()
        except Exception as e:
            print(f"ERROR processing {col} {e.__class__.__name__} : {e}")
            continue
# Temps sur CASD : 5 minutes par colonne
compute_quantile(dfv, columns=continuous_variables, quantiles=[10])
 40%|████████████████▊                         | 4/10 [09:32<13:37, 136.28s/it]
ERROR processing z6nu SecretViolation : Quantile : ERROR SECRET STATISTIQUE > 0.85 NON RESPECTE (ratio=42.739)
100%|█████████████████████████████████████████| 10/10 [22:36<00:00, 135.61s/it]
CPU times: total: 22min 39s
Wall time: 22min 36s

Calcul des copules

continuous_variables
['revkire',
 'z6ns',
 'z6nt',
 'z6nu',
 'z6rs',
 'z6rt',
 'z6ru',
 'z6ps',
 'z6pt',
 'z6pu']
nb_bucket_var = 10
# on fait des copules en fonction du rfr
for copule_var in ["revkire"]:
    dfv = dfv.sort(copule_var)
    centile = get_primary_buckets(
        dfv, nb_bucket_var, variable_to_split_on=copule_var, minimal_bucket_size=12
    )

    for variable in tqdm(continuous_variables):
        if variable != "revkire":
            try:
                copule = get_copulas(
                    dfv,
                    copule_var,
                    variable,
                    nb_bucket_var,
                    centile,
                    minimal_bucket_size=1000,
                    debug=False,
                )
                anonimyze_lower_and_upper_bound(copule["copules"])
                with open(
                    f"{OUT_PATH}CopulePote-{nb_bucket_var}-{year}-{copule_var}-{variable}.json",
                    "w",
                ) as f:
                    f.write(json.dumps(copule))
            except Exception as e:
                print(f"ERROR processing {variable}", e)
                # raise e
 20%|████████▌                                  | 2/10 [00:48<03:14, 24.34s/it]
ERROR processing z6ns SECRET STATISTIQUE > 0.85 NON RESPECTE
DistribDeVar : less than 1000 for non_zero elements. 3981431 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981504 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981521 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981466 elements at 0
DistribDeVar : less than 1000 for non_zero elements. 3981220 elements at 0
 30%|████████████▉                              | 3/10 [01:38<04:05, 35.05s/it]
for d in dfv.columns:
    print(d)
rnirp8
rbg
__revkire
__MCIRRD
__MNIMXG
__RNIMJW
__revkire_par_part
__assiette_csg_revenus_capital
__revenus_capitaux_prelevement_bareme
__revenus_capitaux_prelevement_liberatoire
__revenus_capitaux_prelevement_forfaitaire_unique_ir
__rente_viagere_titre_onereux_net
__revenu_categoriel_foncier
__assiette_csg_plus_values
__revenus_individuels
__revenus_individuels_par_part
__rev_salaire
__retraites
__rpns_imposables
__chomage_et_indemnites
__pension_invalidite
__pension_alimentaire
del dfv
gc.collect()
0