CASD : Extraction d’agrégats

Les données représentes les postes occupées en 2020.

Nous pouvons les transformer en salarié en regroupant sur IDENT_S.

On peut aussi filtrer sur un salaire minimum ou un nombre d’heure de travail minimum.

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
year = "2020"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data\DADS/"
ARROW_PATH = OUT_PATH + "/../chunks/extrait_dads_" + year + r"-chunk/"
taille_chunk = 2 * 2**20  # 2**20 = 1_048_576
# taille_chunk = 5000
import leximpact_prepare_data

leximpact_prepare_data.__version__
'0.0.17'
import gc
import json
from time import time

import pandas as pd
import vaex
from tqdm import tqdm

from leximpact_prepare_data.scenario_tools.calib_and_copules import *
dfv = vaex.open(ARROW_PATH + "*")
tc.assertEqual(len(dfv), 44_653_064)
# dfv.info()
dfv.get_column_names()

Agrégats SAS

SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/"
    + "nb_salarie_par_secteur_a17.sas7bdat"
)
df_nb_salarie_par_secteur_a17 = pd.read_sas(SAS_FILE, encoding="iso8859-15")
SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/"
    + "nb_pepa_par_secteur_a17.sas7bdat"
)
df_nb_pepa_par_secteur_a17 = pd.read_sas(SAS_FILE, encoding="iso8859-15")
df_agg_pepa_secteur = df_nb_pepa_par_secteur_a17.merge(
    df_nb_salarie_par_secteur_a17, on="A17", how="inner"
)
df_agg_pepa_secteur["Pct"] = (
    df_agg_pepa_secteur.NB_PEPA / df_agg_pepa_secteur.NB_SALARIE * 100
)
df_agg_pepa_secteur.to_csv(OUT_PATH + "pepa_par_secteur_a17.csv", sep=";", index=False)
df_agg_pepa_secteur
A17 NB_PEPA PEPA_SUM PEPA_MEAN NB_SALARIE S_BRUT_par_JOUR Pct
0 NaN 1.0 4.000000e+02 400.000000 245.0 58.532196 0.408163
1 AZ 66305.0 3.763077e+07 567.540511 574232.0 64.149876 11.546727
2 C1 251476.0 1.420695e+08 564.942438 817201.0 82.936438 30.772845
3 C2 2792.0 1.969040e+06 705.243510 9832.0 181.480443 28.397071
4 C3 115311.0 6.070689e+07 526.462239 454483.0 124.204528 25.371906
5 C4 72339.0 3.877711e+07 536.047020 391375.0 130.428609 18.483296
6 C5 449003.0 2.646628e+08 589.445566 1618621.0 103.918980 27.739848
7 DE 148835.0 9.192165e+07 617.607781 449034.0 119.675511 33.145597
8 FZ 270254.0 1.758841e+08 650.810303 1934709.0 83.340986 13.968716
9 GZ 1119707.0 6.952365e+08 620.909276 4391236.0 82.213857 25.498675
10 HZ 433803.0 1.955396e+08 450.756765 1717430.0 87.383688 25.258846
11 IZ 116660.0 4.827930e+07 413.846184 1875430.0 44.749340 6.220440
12 JZ 90158.0 6.520785e+07 723.261886 1124467.0 139.201441 8.017843
13 KZ 370066.0 2.984689e+08 806.528789 1021839.0 155.796695 36.215686
14 LZ 71180.0 4.171407e+07 586.036408 345456.0 97.368482 20.604650
15 MN 739969.0 3.686175e+08 498.152585 6217838.0 92.081551 11.900744
16 OQ 925500.0 5.835964e+08 630.574217 10113835.0 73.541314 9.150832
17 RU 119045.0 6.319583e+07 530.856652 2291876.0 26.249410 5.194216
SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/"
    + "nb_salarie_par_secteur_a88.sas7bdat"
)
df_nb_salarie_par_secteur_a88 = pd.read_sas(SAS_FILE, encoding="iso8859-15")
SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/"
    + "nb_pepa_par_secteur_a88.sas7bdat"
)
df_nb_pepa_par_secteur_a88 = pd.read_sas(SAS_FILE, encoding="iso8859-15")
df_agg_pepa_secteur_a88 = df_nb_pepa_par_secteur_a88.merge(
    df_nb_salarie_par_secteur_a88, on="A88", how="inner"
)
df_agg_pepa_secteur_a88["Pct"] = (
    df_agg_pepa_secteur_a88.NB_PEPA / df_agg_pepa_secteur_a88.NB_SALARIE * 100
)
df_agg_pepa_secteur_a88.to_csv(
    OUT_PATH + "pepa_par_secteur_a88.csv", sep=";", index=False
)
df_agg_pepa_secteur_a88.sort_values("Pct").head(2)
df_agg_pepa_secteur_a88.sort_values("Pct").tail(10)
A88 NB_PEPA PEPA_SUM PEPA_MEAN NB_SALARIE S_BRUT_par_JOUR Pct
85 99 20.0 19100.0 955.0 7688.0 113.034673 0.260146
0 NaN 1.0 400.0 400.0 245.0 58.532196 0.408163
A88 NB_PEPA PEPA_SUM PEPA_MEAN NB_SALARIE S_BRUT_par_JOUR Pct
9 11 20607.0 1.415329e+07 686.819606 65351.0 121.244146 31.532800
35 38 46943.0 3.759974e+07 800.965940 148593.0 86.030216 31.591663
57 65 67325.0 5.164326e+07 767.074059 195170.0 147.668996 34.495568
18 20 61064.0 4.038124e+07 661.293682 171417.0 144.182110 35.623071
32 35 71446.0 2.957919e+07 414.007619 187412.0 154.228654 38.122425
56 64 255801.0 2.171700e+08 848.980308 606516.0 164.088416 42.175474
13 15 18912.0 1.435316e+07 758.944631 39806.0 86.434977 47.510426
19 21 41962.0 3.460768e+07 824.738565 87454.0 151.717081 47.981796
15 17 32983.0 1.886584e+07 571.986815 66723.0 106.174752 49.432729
47 53 125863.0 3.909229e+07 310.593990 252450.0 83.932159 49.856605
SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/nb_salarie_par_effectif.sas7bdat"
)
df_nb_salarie = pd.read_sas(SAS_FILE, encoding="iso8859-15")
SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/nb_pepa_par_effectif.sas7bdat"
)
df_nb_pepa = pd.read_sas(SAS_FILE, encoding="iso8859-15")
df_agg_pepa_eff = df_nb_pepa.merge(df_nb_salarie, on="TREFFECT", how="inner")
df_agg_pepa_eff["Pct"] = df_agg_pepa_eff.NB_PEPA / df_agg_pepa_eff.NB_SALARIE * 100
df_agg_pepa_eff.to_csv(OUT_PATH + "pepa_par_effectif.csv", sep=";", index=False)
df_agg_pepa_eff
TREFFECT NB_PEPA PEPA_SUM PEPA_MEAN NB_SALARIE S_BRUT_par_JOUR Pct
0 00 89542.0 4.856310e+07 542.349986 2964236.0 20.771539 3.020745
1 01 476560.0 3.064212e+08 642.985530 5801395.0 55.538395 8.214576
2 02 532028.0 3.328683e+08 625.659338 4136059.0 74.198121 12.863163
3 03 603046.0 3.661868e+08 607.228708 3934891.0 80.527129 15.325609
4 04 926260.0 5.331160e+08 575.557657 5247148.0 84.580526 17.652637
5 05 758328.0 4.573412e+08 603.091535 4144277.0 88.688104 18.298198
6 06 851186.0 4.732948e+08 556.041520 4548011.0 96.460628 18.715566
7 07 546619.0 3.236595e+08 592.111770 2711072.0 102.879276 20.162467
8 08 292423.0 1.778463e+08 608.181553 2033400.0 110.224798 14.380988
9 09 172619.0 9.792508e+07 567.290247 1574245.0 113.348869 10.965193
10 10 103672.0 5.103256e+07 492.250151 1399887.0 107.068349 7.405741
11 11 10121.0 5.223316e+06 516.086899 909791.0 100.846811 1.112453
df_agg_pepa_secteur.Pct.mean()
df_agg_pepa_secteur_a88.Pct.mean()
df_agg_pepa_eff.Pct.mean()

df_agg_pepa_secteur.NB_SALARIE.sum()
df_agg_pepa_secteur_a88.NB_SALARIE.sum()
df_agg_pepa_eff.NB_SALARIE.sum()
18.772005615154356
18.468614967471087
12.343111199372116
35349139.0
35775761.0
39404412.0

Variables catégorielles

categorical = ["A17", "A88", "CONTRAT_TRAVAIL", "CRIS", "MOTIFCDD", "TREFFECT"]
categorical = [c.lower() for c in categorical]
import numpy as np

# Temps d'éxécution : 11s

for col in tqdm(categorical):
    df_col = dfv.groupby(
        by=col,
        sort=True,
        agg={
            "count": vaex.agg.count(col),
        },
    )
    df = df_col.to_pandas_df()
    df.loc[df["count"] <= 12, "count"] = np.nan
    df.to_csv(f"{OUT_PATH}count_{year}_{col}.csv", index=False)

Variables continues

continuous_variables = [
    "EFF_3112",
    "NET",
    "PEPA",
    "S_BRUT",
]
continuous_variables = [c.lower() for c in continuous_variables]

Calcul d’agregats

def compute_agg(vdf, columns):
    sub_total = []
    vdf.fillna(column_names=columns, value=0, inplace=True)
    # vdf.fillnan(column_names=columns, value=0, inplace=True)
    ldf = vdf.shape[0]
    columns = columns if columns else vdf.get_column_names()
    for col in tqdm(columns):
        # print(col)
        name = f"{col}_non_zero"
        vdf.select(f"{col} != 0", name=name)
        nb_no_zero = int(vdf.count("*", selection=name))
        lenzero = ldf - nb_no_zero
        dict_col = {
            "name": col,
            "nb_line": ldf,
            "lenzero": lenzero,
            "pct_zero": lenzero / ldf * 100,
            "sum": int(vdf.sum(col)),
            "mean": float(vdf.mean(col, selection=name)) if nb_no_zero > 0 else 0.0,
            "variance": float(vdf.var(col, selection=name)) if nb_no_zero > 0 else 0.0,
            "std_dev": float(vdf.std(col, selection=name)) if nb_no_zero > 0 else 0.0,
        }
        sub_total.append(dict_col)
    return pd.DataFrame(sub_total)
# Temps sur CASD : 30s par colonne
df_agg = compute_agg(dfv, continuous_variables)
pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df_agg.to_csv(OUT_PATH + "/agregats_DADS_" + year + ".csv", index=False)
df_agg

Calcul des quantiles

def compute_quantile(vdf, columns=None, quantiles=10):
    vdf.fillna(column_names=columns, value=0, inplace=True)
    # vdf.fillnan(column_names=columns, value=0, inplace=True)
    vdf.shape[0]
    columns = columns if columns else vdf.get_column_names()
    for col in tqdm(columns):
        try:
            # print(col)
            q = Quantile(vdf[col].tolist())
            for quantile in quantiles:
                q_dict = q.get_quantile(quantile)
                # keep_upper_bound_secret(q_dict)
                with open(
                    f"{OUT_PATH}/quantile_DADS_{quantile}_{year}_{col}.json", "w"
                ) as f:
                    f.write(json.dumps(q_dict))
            del q
            gc.collect()
        except Exception as e:
            print(f"ERROR processing {col} {e.__class__.__name__} : {e}")
            continue

Déciles et centiles

# Temps sur CASD : 5 minutes par colonne
compute_quantile(dfv, continuous_variables, quantiles=[10, 100])
# dfv

Calcul de calibration

calib_base_variable = "s_brut"
# dfv["CONTRAT_TRAVAIL"] = dfv["CONTRAT_TRAVAIL"].fillna(0)
# dfv["CONTRAT_TRAVAIL"] = dfv["CONTRAT_TRAVAIL"].astype('int')
# dfv = dfv.sort("CONTRAT_TRAVAIL")

##dfv["S_BRUT"] = dfv["S_BRUT"].astype('int')
dfv = dfv.sort(calib_base_variable)
# tc.assertEqual(dfv["S_BRUT"].count(), 39264696)
# < 1 minute
une_tranche_rfr = get_primary_buckets(dfv, 1, variable_to_split_on=calib_base_variable)
une_tranche_rfr

On sort les distributions de chaque variable continue

# Temps sur CASD : 30 minutes pour 4 colonnes
nb_bucket_var = 100

for variable in tqdm(continuous_variables):
    try:
        calib = get_copulas(
            dfv, calib_base_variable, variable, nb_bucket_var, une_tranche_rfr
        )
        calib = calib["copules"][0]["buckets"]
        if isinstance(calib, str):
            print(f"ERROR {variable} calib is '{calib}'")
            continue
        keep_upper_bound_secret(calib)
        with open(
            f"{OUT_PATH}Distrib_DADS-{nb_bucket_var}-{year}-{variable}.json", "w"
        ) as f:
            f.write(json.dumps(calib))
    except Exception as e:
        print(f"ERROR processing {variable}", e)
        raise e
# from IPython.display import JSON

# JSON(calib)
# %%time
# # Temps sur CASD : 138s par iteration
# nb_bucket_var = 100

# for variable in tqdm(continuous_variables):
#     #calib = get_calib(dfv, variable, nb_bucket_var)
#     # print(variable)
#     calib = compute_copule_vaex(dfv, variable, nb_bucket_var, une_tranche_rfr)
#     calib["copules"][0]["buckets"][-1]["seuil_var_supp"] = "secret"
#     with open(f"{OUT_PATH}CalibPote-{nb_bucket_var}-{year}-{variable}.json", "w") as f:
#         f.write(json.dumps(calib["copules"][0]["buckets"]))
# %%time
# # Temps sur CASD : 538s par iteration !
# nb_bucket_var = 1000

# for variable in tqdm(continuous_variables):
#     #calib = get_calib(dfv, variable, nb_bucket_var)
#     # print(variable)
#     calib = compute_copule_vaex(dfv, variable, nb_bucket_var, une_tranche_rfr)
#     calib["copules"][0]["buckets"][-1]["seuil_var_supp"] = "secret"
#     with open(f"{OUT_PATH}CalibPote-{nb_bucket_var}-{year}-{variable}.json", "w") as f:
#         f.write(json.dumps(calib["copules"][0]["buckets"]))

Extraction de Copules

On ne sort des copules que pour: - PEPA par salaire brut - PEPA par type de contrat de travail - PEPA par secteur d’activité

# Uniquement les CDI
dfv_cdi = dfv[dfv.contrat_travail == "01"]  # CDI
dfv_cdi = dfv_cdi[dfv_cdi.s_brut > 0]  # Salaire positif
dfv_cdi = dfv_cdi[dfv_cdi.duree > 160]  # Plus de 160 jours par an
# dfv_cdi = dfv.evaluate(selection="cdi")
len(dfv_cdi)
# %%time
# variable = None
# del variable
# gc.collect()
# variable = dfv_cdi.to_arrays(column_names=["s_brut"], selection=False, array_type="python")[0]  # dfv_cdi.to_pandas_df("s_brut", array_type="list")
# type(variable)
# Redéfinition à migrer !
from typing import Dict, List


def get_primary_buckets(
    vdx_sort: vaex.dataframe.DataFrameLocal,
    nb_bucket: int,
    variable_to_split_on: str = "revkire",
    minimal_bucket_size=12,
    debug=False,
) -> Dict:
    """
    Objectif: Split the variable in buckets
    Dans chaque bucket on stocke toutes les valeurs non nulles de "variable"
    ::vdx_sort:: Le dataset, trié selon la variable à étudier
    ::nb_bucket:: Nombre de tranches souhaitées
    ::variable_to_split_on:: Variable on which to split buckets
    ::debug:: Pour activer un mode debug, qui affiche des traces
    """
    dataset_size = vdx_sort.shape[0]  # Nb de lignes
    # Conversion en array
    variable_array = vdx_sort.to_arrays(
        column_names=["s_brut"], selection=False, array_type="python"
    )[0]
    # On vérifie que le dataset est bien trié
    previous = variable_array[-1]
    for i in range(1, 1000):
        idx = dataset_size // i
        idx = idx if idx != dataset_size else dataset_size - 1
        if previous < variable_array[idx]:
            raise DatasetNotSorted(
                f"Your dataset is not sorted on {variable_to_split_on}!"
            )
        previous = variable_array[idx]

    # Découpage du RFR en buckets:
    borders = get_borders(
        dataset_size=dataset_size,
        nb_bucket=nb_bucket,
        minimal_bucket_size=minimal_bucket_size,
        debug=debug,
    )

    # On retire la dernière frontière pour éviter des tests (index out of range), on la remetra après
    borders = borders[:-1]
    i = 0
    # On supprime les frontières qui n'auraient que du 0
    while i < len(borders):
        if variable_array[borders[i]] < 1:
            if debug:
                print(
                    f"WARNING: On efface la frontière d'index {i} : {borders[i]} inutile car valeur de la borne haute est {variable_array[borders[i]]}"
                )
            borders = borders[:i] + borders[i + 1 :]
        else:
            i += 1
    frontieres_valeurs = [0] + [variable_array[frontiere] for frontiere in borders]
    # On ajoute une valeur de fin trés haute (10^15€)
    frontieres_valeurs += [10**15]
    # On remet la dernière frontière
    borders += [dataset_size]
    dic = {"borders_values": frontieres_valeurs, "borders": borders}
    del variable_array
    gc.collect()
    return dic
calib_base_variable = "s_brut"
decile_salaire = get_primary_buckets(
    dfv_cdi, 10, variable_to_split_on=calib_base_variable, debug=False
)

nb_bucket_var = 10
def get_copulas(
    vdf: vaex.dataframe.DataFrameLocal,
    primary_variable: str,
    variable: str,
    nb_bucket_var: int,
    primary_buckets: List,
    debug=False,
    minimal_bucket_size=12,
):
    """
    On nous donne des tranches de RFR, en nombre de personne, et en valeur de RFR
    Pour chacune de ses tranches on doit extraire les valeurs de 'variable'
    On ne garde que celle supérieure à 0 et on les envoie à DistribDeVarVaex
    ::vdf:: Le jeux de données
    ::variable:: Nom de la variable secondaire.
    ::nb_bucket_var:: Nombre de tranches de variable secondaire souhaités.
    ::primary_buckets:: La liste des tranches de RFR.
    ::debug:: Pour activer un mode debug, qui affiche des traces.
    ::minimal_bucket_size:: Nombre minimal d'individus pour respecter le secret statistique.
    """
    controle = []
    copules = []
    frontieres_valeurs = primary_buckets["borders_values"]
    borders = primary_buckets["borders"]

    if primary_variable in vdf.get_column_names():
        primary_variable = primary_variable
    else:
        primary_variable = vdf.get_column_names()[0]

    # Conversion en array
    primary_variable_array = vdf.to_arrays(
        column_names=[primary_variable], selection=False, array_type="python"
    )[0]
    dataset_size = len(primary_variable_array)
    # On vérifie que le dataset est bien trié
    previous = primary_variable_array[-1]
    for i in range(1, 1000):
        idx = dataset_size // i
        idx = idx if idx != dataset_size else dataset_size - 1
        if previous < primary_variable_array[idx]:
            raise DatasetNotSorted(
                f"Your dataset is not sorted on {variable_to_split_on}!"
            )
        previous = primary_variable_array[idx]

    # On parcourt les frontières de FF (= les index dans le tableau)
    idx_inf = 0

    debut = time()
    # On ne peut malheureusement pas filtrer par > 0 avant extraction car cela fausserait le nombre de valeur
    variable_all_values = vdf.to_arrays(
        column_names=[variable], selection=False, array_type="python"
    )[0]

    # On fait l'hypothèse que c'est bien trié par ordre croissant
    lower_bound = primary_variable_array[idx_inf]
    if debug:
        print(f"Temps d'extraction par to_arrays  {time()-debut}")
    for i, idx_sup in enumerate(borders):
        starttime = time()
        upper_bound = frontieres_valeurs[i + 1]  # Car frontieres_valeurs contient 0
        variable_values = variable_all_values[idx_inf:idx_sup]
        # nb_ff = vdf_tmp.shape[0]
        nb_ff = len(variable_values)
        if debug:
            print(f"-----------------Temps après slice {time()-starttime}")
        assert nb_ff == idx_sup - idx_inf
        # Quand il y a beaucoup de personne ayant le même revenu on peut avec des tranches avec lower_bound=upper_bound, mais ce n'est pas gênant
        if (
            primary_variable_array[idx_inf] != lower_bound
            and lower_bound != upper_bound
        ):
            print(
                f"get_copulas {i} WARNING: Il y a peut-être un problème car le RFR du premier index (idx_inf={idx_inf}) est {primary_variable_array[idx_inf]} alors que lower_bound vaut {lower_bound}"
            )
        if (
            i != len(borders) - 1
            and primary_variable_array[idx_sup] != upper_bound
            and lower_bound != upper_bound
        ):
            print(
                f"get_copulas {i} WARNING: Il y a peut-être un problème car le RFR du dernier index (idx_sup={idx_sup}) est {primary_variable_array[idx_sup]} alors que upper_bound vaut {upper_bound}"
            )
        # Remove 0
        variable_values = [v for v in variable_values if v > 0]

        if debug:
            print(f"Temps avant sort {time()-starttime}")
        # Tri des variables : sort() est plus rapide que sorted, mais écrase notre liste
        variable_values.sort()
        # variable_values = sorted(variable_values)
        if debug:
            print(f"Temps après sort {time()-starttime}")
        if debug:
            print(
                f"get_copulas {i} : index entre idx_inf={idx_inf} et idx_sup={idx_sup} - RFR entre lower_bound={lower_bound} et upper_bound={upper_bound} - {len(variable_values)} valeurs différentes de zéro."
            )
            if len(variable_values) > 0:
                print(
                    f"\tmin(variable_values)={min(variable_values)} max(variable_values)={max(variable_values)}"
                )
        if len(variable_values) > idx_sup - idx_inf:
            print(
                f"get_copulas ERROR i={i} len(variable_values)={len(variable_values)} != {idx_sup - idx_inf}"
            )
        assert len(variable_values) <= (idx_sup - idx_inf)
        if debug:
            DistribDeVar_time = time()
        bdr = DistribDeVarVaex(
            variable_values=variable_values,
            variable=variable,
            nb_ff=nb_ff,
            nb_bucket_var=nb_bucket_var,
            lower_bound=lower_bound,
            upper_bound=upper_bound,
            debug=debug,
            minimal_bucket_size=minimal_bucket_size,
        )
        if debug:
            print(f"Temps de DistribDeVarVaex {time()-DistribDeVar_time}")
        # Et on ajoute ce tableau à la liste des tableaux
        copules += [bdr.to_dict()]

        idx_inf = idx_sup
        lower_bound = upper_bound
        if debug:
            print(f"Temps après fin de la boucle {time()-starttime} --------------")
        if debug and i > 10:
            print("DEBUG EXIT !!!")
            break
    dico = {"controle": controle, "copules": copules}

    return dico
variable = "pepa"
try:
    copule = get_copulas(
        dfv_cdi, calib_base_variable, variable, nb_bucket_var, decile_salaire
    )
    with open(
        f"{OUT_PATH}CopuleDADS_CDI_{calib_base_variable}-{nb_bucket_var}-{year}-{variable}.json",
        "w",
    ) as f:
        f.write(json.dumps(copule))
except Exception as e:
    print(f"ERROR processing {variable}", e)
    # raise e
copule["copules"]
del dfv
gc.collect()