CASD : Extraction d’agrégats

#!conda list
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
year = "2019"
# year = "2019"
# year = "2018"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data/"
OUT_PATH = OUT_PATH + "assiettes_pote_brutes_" + year + "-chunk/"
taille_chunk = 2 * 2**20  # 2**20 = 1_048_576
import leximpact_prepare_data

leximpact_prepare_data.__version__
'0.0.23'
import gc

import pandas as pd
import vaex
from tqdm import tqdm

from leximpact_prepare_data.scenario_tools.calib_and_copules import *
# Temps de chargement 8 secondes pour 39,264,695 lignes, vive la lazy loading !
dfv = vaex.open(OUT_PATH + "*")
# dfv = vaex.open(ARROW_PATH + "pote_brutes_2019_5.arrow")
# dfv
CPU times: total: 375 ms
Wall time: 372 ms
# Temps d'exécution : 2 secondes
# pyramide_des_ages = dfv.groupby(by="aged", agg={"age": vaex.agg.count("aged")})
# pyramide_des_ages
# dfv.info()
# tc.assertEqual(dfv["revkire"].count(), 39512402)  # 2019
tc.assertEqual(dfv["revkire"].count(), 39818227)  # 2020
dfv.get_column_names()
# Remove id fip18_c
_ = dfv.drop("fip18_c", inplace=True)
['rnsgbd',
 'rnsgld',
 'revkire',
 'z1aj',
 'z1ap',
 'z1as',
 'z1bj',
 'z1bp',
 'z1bs',
 'z1cj',
 'z1cw',
 'z1dw',
 'z2ch',
 'z2dc',
 'z2dh',
 'z2tr',
 'z3ua',
 'z3vg',
 'z3vz',
 'z4ba',
 'z4bb',
 'z4bc',
 'z4bd',
 'z4be',
 'z6de',
 'z8sc',
 'z8sw',
 'z8sx',
 'cics',
 'mnimqg',
 'fip18_c',
 'revenus_capitaux_prelevement_bareme',
 'revenus_capitaux_prelevement_liberatoire',
 'revenus_capitaux_prelevement_forfaitaire_unique_ir',
 'rente_viagere_titre_onereux_net',
 'revenu_categoriel_foncier',
 'assiette_csg_plus_values',
 'assiette_csg_revenus_capital',
 'retraites',
 'chomage_et_indemnites',
 'rev_salaire']

Variables continues

# "Z1ak Z1bk txmoy impot impotnet rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi Z1az Z1bz".split(" ")
# continuous_variables = dfv.get_column_names()
# continuous_variables = [c.lower() for c in continuous_variables]
continuous_variables = [
    "revkire",
    "revkire_par_part",
    "rbg",
    "rnirp8",
    "assiette_csg_revenus_capital",
    "revenus_capitaux_prelevement_bareme",
    "revenus_capitaux_prelevement_liberatoire",
    "revenus_capitaux_prelevement_forfaitaire_unique_ir",
    "rente_viagere_titre_onereux_net",
    "revenu_categoriel_foncier",
    "assiette_csg_plus_values",
    "revenus_individuels",
    "revenus_individuels_par_part",
    "rev_salaire",
    "retraites",
    "chomage_et_indemnites",
    "rag",
    "ric",
    "rnc",
    "pension_invalidite",
    "pension_alimentaire",
]

Calcul d’agregats

def compute_agg(vdf, columns):
    sub_total = []
    vdf.fillna(column_names=columns, value=0, inplace=True)
    # vdf.fillnan(column_names=columns, value=0, inplace=True)
    ldf = vdf.shape[0]
    columns = columns if columns else vdf.get_column_names()
    for col in tqdm(columns):
        # print(col)
        name = f"{col}_non_zero"
        vdf.select(f"{col} != 0", name=name)
        nb_no_zero = int(vdf.count("*", selection=name))
        lenzero = ldf - nb_no_zero
        dict_col = {
            "name": col,
            "nb_line": ldf,
            "lenzero": lenzero,
            "pct_zero": lenzero / ldf * 100,
            "sum": int(vdf.sum(col)),
            "mean": float(vdf.mean(col, selection=name)) if nb_no_zero > 0 else 0.0,
            "variance": float(vdf.var(col, selection=name)) if nb_no_zero > 0 else 0.0,
            "std_dev": float(vdf.std(col, selection=name)) if nb_no_zero > 0 else 0.0,
        }
        sub_total.append(dict_col)
    return pd.DataFrame(sub_total)
# Temps sur CASD : 30s par colonne avant l'upgrade de machine, moins de 3 secondes après upgrade !
df_agg = compute_agg(dfv, continuous_variables)
# dfv.mnipeg.mean()
pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df_agg.to_csv(OUT_PATH + "/agregats_POTE_revenus_rici_" + year + ".csv", index=False)
df_agg
del df_agg
import gc

gc.collect()

Extraction de fonctions de répartition (pour calibration)

dfv = dfv.fillna(0)
CPU times: total: 23.4 s
Wall time: 23.2 s
# calib = get_calib(dfv, "rimp", 10)
CPU times: total: 0 ns
Wall time: 0 ns
# Redéfinition à migrer !
from typing import Dict


def get_primary_buckets(
    vdx_sort: vaex.dataframe.DataFrameLocal,
    nb_bucket: int,
    variable_to_split_on: str = "revkire",
    minimal_bucket_size=12,
    debug=False,
) -> Dict:
    """
    Objectif: Split the variable in buckets
    Dans chaque bucket on stocke toutes les valeurs non nulles de "variable"
    ::vdx_sort:: Le dataset, trié selon la variable à étudier
    ::nb_bucket:: Nombre de tranches souhaitées
    ::variable_to_split_on:: Variable on which to split buckets
    ::debug:: Pour activer un mode debug, qui affiche des traces
    """
    dataset_size = vdx_sort.shape[0]  # Nb de lignes
    # Conversion en array
    variable_array = vdx_sort.to_arrays(
        column_names=[variable_to_split_on], selection=False, array_type="python"
    )[0]
    # On vérifie que le dataset est bien trié
    previous = variable_array[-1]
    for i in range(1, 1000):
        idx = dataset_size // i
        idx = idx if idx != dataset_size else dataset_size - 1
        if previous < variable_array[idx]:
            raise DatasetNotSorted(
                f"Your dataset is not sorted on {variable_to_split_on}!"
            )
        previous = variable_array[idx]

    # Découpage du RFR en buckets:
    borders = get_borders(
        dataset_size=dataset_size,
        nb_bucket=nb_bucket,
        minimal_bucket_size=minimal_bucket_size,
        debug=debug,
    )

    # On retire la dernière frontière pour éviter des tests (index out of range), on la remetra après
    borders = borders[:-1]
    i = 0
    # On supprime les frontières qui n'auraient que du 0
    while i < len(borders):
        if variable_array[borders[i]] < 1:
            if debug:
                print(
                    f"WARNING: On efface la frontière d'index {i} : {borders[i]} inutile car valeur de la borne haute est {variable_array[borders[i]]}"
                )
            borders = borders[:i] + borders[i + 1 :]
        else:
            i += 1
    frontieres_valeurs = [0] + [variable_array[frontiere] for frontiere in borders]
    # On ajoute une valeur de fin trés haute (10^15€)
    frontieres_valeurs += [10**15]
    # On remet la dernière frontière
    borders += [dataset_size]
    dic = {"borders_values": frontieres_valeurs, "borders": borders}
    del variable_array
    gc.collect()
    return dic
# TODO : import from package
# def get_fake_upper_bound(val):
#     if val == 1e15:
#         return 1e15
#     else:
#         return 10 ** (len(str(int(val))))
# get_fake_upper_bound(100.5)
# calib = get_copulas(dfv, "revkire", "revkire", 10, une_tranche_rfr)
# calib["copules"]["buckets"][-1]
# calib = calib["copules"][0]["buckets"]
# keep_upper_bound_secret(calib)
# calib
# calib["buckets"][-1]["seuil_var_supp"] = "secret"
# calib
# from IPython.display import JSON

# JSON(calib)
# %%time
# # Temps sur CASD : 138s par iteration
# nb_bucket_var = 100

# for variable in tqdm(continuous_variables):
#     #calib = get_calib(dfv, variable, nb_bucket_var)
#     # print(variable)
#     calib = compute_copule_vaex(dfv, variable, nb_bucket_var, une_tranche_rfr)
#     calib["copules"][0]["buckets"][-1]["seuil_var_supp"] = "secret"
#     with open(f"{OUT_PATH}CalibPote-{nb_bucket_var}-{year}-{variable}.json", "w") as f:
#         f.write(json.dumps(calib["copules"][0]["buckets"]))
# %%time
# # Temps sur CASD : 538s par iteration !
# nb_bucket_var = 1000

# for variable in tqdm(continuous_variables):
#     #calib = get_calib(dfv, variable, nb_bucket_var)
#     # print(variable)
#     calib = compute_copule_vaex(dfv, variable, nb_bucket_var, une_tranche_rfr)
#     calib["copules"][0]["buckets"][-1]["seuil_var_supp"] = "secret"
#     with open(f"{OUT_PATH}CalibPote-{nb_bucket_var}-{year}-{variable}.json", "w") as f:
#         f.write(json.dumps(calib["copules"][0]["buckets"]))

Extraction de Copules

nb_bucket_var = 10

# on fait des copules en fonction du rfr mais aussi en fonction des revenus individuels pour voir si ça permet d'améliorer l'imputation
# Les copules en fonction de assiette_csg_revenus_capital servent si on veut voir la distribution des différents revenus du capital dans la somme de revenus du capital
for copule_var in ["revkire", "revkire_par_part", "revenus_individuels", "revenus_individuels_par_part", "assiette_csg_revenus_capital"]
    centile = get_primary_buckets(
        dfv, nb_bucket_var, variable_to_split_on=copule_var, minimal_bucket_size=500
    )

    for variable in tqdm(continuous_variables):  # continuous_variables
        try:
            copule = get_copulas(
                dfv,
                copule_var,
                variable,
                nb_bucket_var,
                centile_rfr,
                minimal_bucket_size=100,
            )
            # copule["copules"][0]["buckets"][-1]["upper_bound"] = "secret"
            anonimyze_lower_and_upper_bound(copule["copules"])
            with open(
                f"{OUT_PATH}CopulePote-{nb_bucket_var}-{year}-{copule_var}-{variable}.json", "w"
            ) as f:
                f.write(json.dumps(copule))
        except Exception as e:
            print(f"ERROR processing {variable}", e)
            # raise e
100%|███████████████████████████████████████████| 3/3 [07:55<00:00, 158.59s/it]
CPU times: total: 7min 54s
Wall time: 7min 55s
# dfv.column_names
# copule = get_copulas(dfv, "revkire", variable, nb_bucket_var, centile_rfr)