CASD : Conversion de l’extraction SAS en Apache Parquet + retraitement de certaines variables de POTE brut

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
year = "2019"
# year = "2019"
# year = "2018"
SAS_FILE = (
    r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/agregats_pote_"
    + year
    + ".sas7bdat"
)
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data/"
OUT_PATH = OUT_PATH + "assiettes_pote_brutes_" + year + "-chunk/"
taille_chunk = 2 * 2**20  # 2**20 = 1_048_576
# taille_chunk = 5000
# taille_chunk = 600_000
import shutil
from pathlib import Path

import pandas as pd
import vaex
from tqdm import tqdm
# sas_col = "FIP18_c revkire rimp tsirna rbg srbg mnrvr3 mnrvi2 mnrvk mnrvni Z8uy nbpart zn nbefi nbfoy nbpldm mat agec aged clirpg frf Z1ak Z1bk txmoy impot impotnet j rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi f g h i r p Z1az Z1bz stutile zf zp".split(    " ")
# sas_col = "FIP18_c revkire rimp tsirna rbg srbg mnrvr3 mnrvi2 mnrvk mnrvni Z8uy nbpart zn nbefi nbfoy nbpldm mat agec aged clirpg frf Z1ak Z1bk txmoy impot impotnet j rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi f g h i r p Z1az Z1bz stutile zf zp".split(
#     " "
# )

Traitement des données du fichier SAS

# On somme les colonnes qui nous intéressent, pour l'instant n'est valable que pour 2019
def sum_columns(df):
    # Pour assiette de CSG
    df["revenus_capitaux_prelevement_bareme"] = df.z2ch
    df["revenus_capitaux_prelevement_liberatoire"] = df.z2dh + df.z2xx
    df["revenus_capitaux_prelevement_forfaitaire_unique_ir"] = df.z2dc + df.z2tr + df.z2ts + df.z2ww + df.z2zz + df.z2fu + df.z2tt
    df["rente_viagere_titre_onereux_net"] = df.z1cw + df.z1dw
    # Micro-foncier : on applique l'abattement de 30%
    df["revenu_categoriel_foncier"] = (
        df.z4ba + (df.z4be * 0.7) - (df.z4bb + df.z4bc + df.z4bd)
    )
    # df['rev_categ_foncier4ba']=df.Z4ba
    df["assiette_csg_plus_values"] = df.z3vg + df.z3ua - df.z3va
    df["assiette_csg_revenus_capital"] = (
        df["revenus_capitaux_prelevement_bareme"]
        + df["revenus_capitaux_prelevement_liberatoire"]
        + df["revenus_capitaux_prelevement_forfaitaire_unique_ir"]
        + df["rente_viagere_titre_onereux_net"]
        + df["revenu_categoriel_foncier"]
        + df["assiette_csg_plus_values"]
    )

    # Autres
    df["retraites"] = df.z1as + df.z1bs  # + df.mnimqg
    #     df[
    #         "pre_retraites_etranger"
    #     ] = df.z8sc  # Attention, seulement de l'étranger, sinon c'est df.Z1ap + df.Z1bp

    df["chomage_et_indemnites"] = df.z1ap + df.z1bp

    df["rev_salaire"] = df.z1aj + df.z1bj + df.z1cj + df.z8tk + df.z1af + df.z1ag + df.z1aa + df.z1ba + df.z1gb + df.z1hb + df.z1gf + df.z1hf + df.z1gg + df.z1hg +
                        df.z1aq + df.z1bq + df.z1gh + df.z1hh 

    # Revenus des non salariés
    df['rag'] = df.z5hd + df.z5id + df.z5hb + df.z5ib + df.z5hh + df.z5ih + df.z5hc + df.z5ic + df.z5hi + df.z5ii + df.z5ak + df.z5bk + df.z5al + df.z5bl - df.z5hf -
                df.z5if - df.z5hl - df.z5il + df.z5hm + df.z5im + df.z5hz + df.z5iz + df.z5xa + df.z5ya + df.z5xb + df.z5yb + df.z5xt + df.z5xu + df.z5xv + df.z5xw
    
    df['ric'] = df.z5ta + df.z5ua + df.z5tb + df.z5ub + df.z5kn + df.z5ln + df.z5ko + df.z5lo + df.z5kp + df.z5lp + df.z5kb + df.z5lb + df.z5kh + df.z5lh + df.z5kc +
                df.z5lc + df.z5ki + df.z5li + df.z5df + df.z5ef + df.z5dg + df.z5eg - df.z5kf - df.z5lf - df.z5kl - df.z5ll

    df['rnc'] = df.z5te + df.z5ue + df.z5hp + df.z5ip + df.z5hq + df.z5iq + df.z5qb + df.z5rb + df.z5qh + df.z5rh + df.z5qc + df.z5rc + df.z5qi + df.z5ri + df.z5xj +
                df.z5yj + df.z5xk + df.z5yk - df.z5qe - df.z5re - df.z5qk - df.z5rk + df.z5ql + df.z5rl + df.z5qm + df.z5rm

    df["pension_invalidite"] = df.z1az + df.z1bz

    df["pension_alimentaire"] = df.z1ao + df.z1bo

    df["revenus_individuels"] = df["rev_salaire"] + df["retraites"] + df["chomage_et_indemnites"] + df["rag"] + df["ric"] + df["rnc"] + df["pension_invalidite"] + df.["pension_alimentaire"]

    df["revenus_individuels_par_part"] = df.revenus_individuels / df.nbpart
    df["revkire_par_part"] = df.revkire / df.nbpart
    
    df.drop(['z2ch', 'z2dh', 'z2xx', 'z2dc', 'z2tr', 'z2ts', 'z2ww', 'z2zz', 'z2fu', 'z2tt', 'z1cw', 'z1dw', 'z4ba', 'z4be', 'z4bb', 'z4bc', 'z4bd',
             'z3vg', 'z3ua', 'z3va', 'z1as', 'z1bs', 'z1ap', 'z1bp', 'z1aj', 'z1bj', 'z1cj', 'z8tk', 'z1af', 'z1ag', 'z1aa', 'z1ba', 'z1gb',
             'z1hb', 'z1gf', 'z1hf', 'z1gg', 'z1hg', 'z1aq', 'z1bq', 'z1gh', 'z1hh', 'z5hd', 'z5id', 'z5hb', 'z5ib', 'z5hh', 'z5ih', 
             'z5hc', 'z5ic', 'z5hi', 'z5ii', 'z5ak', 'z5bk', 'z5al', 'z5bl', 'z5hf', 'z5if', 'z5hl', 'z5il', 'z5hm', 'z5im', 'z5hz',
             'z5iz', 'z5xa', 'z5ya', 'z5xb', 'z5yb', 'z5xt', 'z5xu', 'z5xv', 'z5xw', 'z5ta', 'z5ua', 'z5tb', 'z5ub', 'z5kn', 'z5ln', 
             'z5ko', 'z5lo', 'z5kp', 'z5lp', 'z5kb', 'z5lb', 'z5kh', 'z5lh', 'z5kc', 'z5lc', 'z5ki', 'z5li', 'z5df', 'z5ef', 'z5dg',
             'z5eg', 'z5kf', 'z5lf', 'z5kl', 'z5ll', 'z5te', 'z5ue', 'z5hp', 'z5ip', 'z5hq', 'z5iq', 'z5qb', 'z5rb', 'z5qh', 'z5rh',
             'z5qc', 'z5rc', 'z5qi', 'z5ri', 'z5xj', 'z5yj', 'z5xk', 'z5yk', 'z5qe', 'z5re', 'z5qk', 'z5rk', 'z5ql', 'z5rl', 'z5qm', 'z5rm',
             'z1az', 'z1bz', 'z1ao', 'z1bo'])
    return df
def clean_chunk(chunk):
    chunk.columns = [c.lower() for c in chunk.columns.to_list()]
    chunk.fillna(0, inplace=True)
    chunk = chunk.astype({"stutile": "str"})
    chunk = chunk.astype({"fip18_c": "str"})
    chunk = chunk.replace({"tsirna": {"+": 1, "-": "-1"}, "srbg": {"+": 1, "-": "-1"}})
    chunk = chunk.astype({"tsirna": "int32", "srbg": "int32"})
    chunk["rnirp8"] = chunk["rnirp8"] * chunk["tsirna"]
    chunk["rbg"] = chunk["rbg"] * chunk["srbg"]
    chunk["f+h"] = chunk["f"] + chunk["h"]
    # chunk["zp+zf"] = chunk["zp"] + chunk["zf"] => Its a boolean !!!
    chunk.drop(["tsirna", "srbg"], axis=1, inplace=True)
    chunk = sum_columns(chunk)

    return chunk

Lecture du fichier SAS

On va lire le fichier par morceau de 1 million de lignes, pour ne pas saturer la mémoire. Il y a 39 millions de lignes.

On va les enregistrer au fur et à mesure en format Apache Arrow.

# Temps sur CASD : < 20 minutes.


# Efface le dossier de sortie
shutil.rmtree(OUT_PATH, ignore_errors=True)
Path(OUT_PATH).mkdir(parents=True, exist_ok=True)

dfi = pd.read_sas(
    SAS_FILE, chunksize=taille_chunk, encoding="iso8859-15", iterator=True
)

dd_values = None
i = 0
print(f"Nombre d'itérations : {39512402/taille_chunk:.0f}")
for chunk in tqdm(dfi):
    chunk = clean_chunk(chunk)
    dd_values = vaex.from_pandas(chunk, copy_index=False)
    dd_values.export(f"{OUT_PATH}pote_brutes_{year}_{i}.parquet")
    del dd_values
    dd_values = None
    #### DEBUG
    i += 1
    #     if i>=2:
    #         break
    #### DEBUG
Nombre d'itérations : 19
CPU times: total: 20min 54s
Wall time: 22min
19it [22:00, 69.48s/it]

Vérification

chunk
from leximpact_prepare_data.calib_and_copules import tc

dfv = vaex.open(f"{OUT_PATH}pote_brutes_{year}_*.parquet")
tc.assertEqual(len(dfv), 39_264_696)  # 39_512_402
tc.assertGreaterEqual(dfv["revkire"].count(), 36644848)
tc.assertGreaterEqual(dfv["revkire"].sum(), 1_084_000_000_000)