from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"CASD : Conversion de l’extraction SAS en Apache Parquet + retraitement de certaines variables de POTE brut
year = "2019"
# year = "2019"
# year = "2018"
SAS_FILE = (
r"C:\Users\Public\Documents\TRAVAIL\agregats\sas/agregats_pote_"
+ year
+ ".sas7bdat"
)
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data/"
OUT_PATH = OUT_PATH + "assiettes_pote_brutes_" + year + "-chunk/"
taille_chunk = 2 * 2**20 # 2**20 = 1_048_576
# taille_chunk = 5000
# taille_chunk = 600_000import shutil
from pathlib import Path
import pandas as pd
import vaex
from tqdm import tqdm# sas_col = "FIP18_c revkire rimp tsirna rbg srbg mnrvr3 mnrvi2 mnrvk mnrvni Z8uy nbpart zn nbefi nbfoy nbpldm mat agec aged clirpg frf Z1ak Z1bk txmoy impot impotnet j rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi f g h i r p Z1az Z1bz stutile zf zp".split( " ")
# sas_col = "FIP18_c revkire rimp tsirna rbg srbg mnrvr3 mnrvi2 mnrvk mnrvni Z8uy nbpart zn nbefi nbfoy nbpldm mat agec aged clirpg frf Z1ak Z1bk txmoy impot impotnet j rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi f g h i r p Z1az Z1bz stutile zf zp".split(
# " "
# )Traitement des données du fichier SAS
# On somme les colonnes qui nous intéressent, pour l'instant n'est valable que pour 2019
def sum_columns(df):
# Pour assiette de CSG
df["revenus_capitaux_prelevement_bareme"] = df.z2ch
df["revenus_capitaux_prelevement_liberatoire"] = df.z2dh + df.z2xx
df["revenus_capitaux_prelevement_forfaitaire_unique_ir"] = df.z2dc + df.z2tr + df.z2ts + df.z2ww + df.z2zz + df.z2fu + df.z2tt
df["rente_viagere_titre_onereux_net"] = df.z1cw + df.z1dw
# Micro-foncier : on applique l'abattement de 30%
df["revenu_categoriel_foncier"] = (
df.z4ba + (df.z4be * 0.7) - (df.z4bb + df.z4bc + df.z4bd)
)
# df['rev_categ_foncier4ba']=df.Z4ba
df["assiette_csg_plus_values"] = df.z3vg + df.z3ua - df.z3va
df["assiette_csg_revenus_capital"] = (
df["revenus_capitaux_prelevement_bareme"]
+ df["revenus_capitaux_prelevement_liberatoire"]
+ df["revenus_capitaux_prelevement_forfaitaire_unique_ir"]
+ df["rente_viagere_titre_onereux_net"]
+ df["revenu_categoriel_foncier"]
+ df["assiette_csg_plus_values"]
)
# Autres
df["retraites"] = df.z1as + df.z1bs # + df.mnimqg
# df[
# "pre_retraites_etranger"
# ] = df.z8sc # Attention, seulement de l'étranger, sinon c'est df.Z1ap + df.Z1bp
df["chomage_et_indemnites"] = df.z1ap + df.z1bp
df["rev_salaire"] = df.z1aj + df.z1bj + df.z1cj + df.z8tk + df.z1af + df.z1ag + df.z1aa + df.z1ba + df.z1gb + df.z1hb + df.z1gf + df.z1hf + df.z1gg + df.z1hg +
df.z1aq + df.z1bq + df.z1gh + df.z1hh
# Revenus des non salariés
df['rag'] = df.z5hd + df.z5id + df.z5hb + df.z5ib + df.z5hh + df.z5ih + df.z5hc + df.z5ic + df.z5hi + df.z5ii + df.z5ak + df.z5bk + df.z5al + df.z5bl - df.z5hf -
df.z5if - df.z5hl - df.z5il + df.z5hm + df.z5im + df.z5hz + df.z5iz + df.z5xa + df.z5ya + df.z5xb + df.z5yb + df.z5xt + df.z5xu + df.z5xv + df.z5xw
df['ric'] = df.z5ta + df.z5ua + df.z5tb + df.z5ub + df.z5kn + df.z5ln + df.z5ko + df.z5lo + df.z5kp + df.z5lp + df.z5kb + df.z5lb + df.z5kh + df.z5lh + df.z5kc +
df.z5lc + df.z5ki + df.z5li + df.z5df + df.z5ef + df.z5dg + df.z5eg - df.z5kf - df.z5lf - df.z5kl - df.z5ll
df['rnc'] = df.z5te + df.z5ue + df.z5hp + df.z5ip + df.z5hq + df.z5iq + df.z5qb + df.z5rb + df.z5qh + df.z5rh + df.z5qc + df.z5rc + df.z5qi + df.z5ri + df.z5xj +
df.z5yj + df.z5xk + df.z5yk - df.z5qe - df.z5re - df.z5qk - df.z5rk + df.z5ql + df.z5rl + df.z5qm + df.z5rm
df["pension_invalidite"] = df.z1az + df.z1bz
df["pension_alimentaire"] = df.z1ao + df.z1bo
df["revenus_individuels"] = df["rev_salaire"] + df["retraites"] + df["chomage_et_indemnites"] + df["rag"] + df["ric"] + df["rnc"] + df["pension_invalidite"] + df.["pension_alimentaire"]
df["revenus_individuels_par_part"] = df.revenus_individuels / df.nbpart
df["revkire_par_part"] = df.revkire / df.nbpart
df.drop(['z2ch', 'z2dh', 'z2xx', 'z2dc', 'z2tr', 'z2ts', 'z2ww', 'z2zz', 'z2fu', 'z2tt', 'z1cw', 'z1dw', 'z4ba', 'z4be', 'z4bb', 'z4bc', 'z4bd',
'z3vg', 'z3ua', 'z3va', 'z1as', 'z1bs', 'z1ap', 'z1bp', 'z1aj', 'z1bj', 'z1cj', 'z8tk', 'z1af', 'z1ag', 'z1aa', 'z1ba', 'z1gb',
'z1hb', 'z1gf', 'z1hf', 'z1gg', 'z1hg', 'z1aq', 'z1bq', 'z1gh', 'z1hh', 'z5hd', 'z5id', 'z5hb', 'z5ib', 'z5hh', 'z5ih',
'z5hc', 'z5ic', 'z5hi', 'z5ii', 'z5ak', 'z5bk', 'z5al', 'z5bl', 'z5hf', 'z5if', 'z5hl', 'z5il', 'z5hm', 'z5im', 'z5hz',
'z5iz', 'z5xa', 'z5ya', 'z5xb', 'z5yb', 'z5xt', 'z5xu', 'z5xv', 'z5xw', 'z5ta', 'z5ua', 'z5tb', 'z5ub', 'z5kn', 'z5ln',
'z5ko', 'z5lo', 'z5kp', 'z5lp', 'z5kb', 'z5lb', 'z5kh', 'z5lh', 'z5kc', 'z5lc', 'z5ki', 'z5li', 'z5df', 'z5ef', 'z5dg',
'z5eg', 'z5kf', 'z5lf', 'z5kl', 'z5ll', 'z5te', 'z5ue', 'z5hp', 'z5ip', 'z5hq', 'z5iq', 'z5qb', 'z5rb', 'z5qh', 'z5rh',
'z5qc', 'z5rc', 'z5qi', 'z5ri', 'z5xj', 'z5yj', 'z5xk', 'z5yk', 'z5qe', 'z5re', 'z5qk', 'z5rk', 'z5ql', 'z5rl', 'z5qm', 'z5rm',
'z1az', 'z1bz', 'z1ao', 'z1bo'])
return dfdef clean_chunk(chunk):
chunk.columns = [c.lower() for c in chunk.columns.to_list()]
chunk.fillna(0, inplace=True)
chunk = chunk.astype({"stutile": "str"})
chunk = chunk.astype({"fip18_c": "str"})
chunk = chunk.replace({"tsirna": {"+": 1, "-": "-1"}, "srbg": {"+": 1, "-": "-1"}})
chunk = chunk.astype({"tsirna": "int32", "srbg": "int32"})
chunk["rnirp8"] = chunk["rnirp8"] * chunk["tsirna"]
chunk["rbg"] = chunk["rbg"] * chunk["srbg"]
chunk["f+h"] = chunk["f"] + chunk["h"]
# chunk["zp+zf"] = chunk["zp"] + chunk["zf"] => Its a boolean !!!
chunk.drop(["tsirna", "srbg"], axis=1, inplace=True)
chunk = sum_columns(chunk)
return chunkLecture du fichier SAS
On va lire le fichier par morceau de 1 million de lignes, pour ne pas saturer la mémoire. Il y a 39 millions de lignes.
On va les enregistrer au fur et à mesure en format Apache Arrow.
# Temps sur CASD : < 20 minutes.
# Efface le dossier de sortie
shutil.rmtree(OUT_PATH, ignore_errors=True)
Path(OUT_PATH).mkdir(parents=True, exist_ok=True)
dfi = pd.read_sas(
SAS_FILE, chunksize=taille_chunk, encoding="iso8859-15", iterator=True
)
dd_values = None
i = 0
print(f"Nombre d'itérations : {39512402/taille_chunk:.0f}")
for chunk in tqdm(dfi):
chunk = clean_chunk(chunk)
dd_values = vaex.from_pandas(chunk, copy_index=False)
dd_values.export(f"{OUT_PATH}pote_brutes_{year}_{i}.parquet")
del dd_values
dd_values = None
#### DEBUG
i += 1
# if i>=2:
# break
#### DEBUGNombre d'itérations : 19
CPU times: total: 20min 54s
Wall time: 22min
19it [22:00, 69.48s/it]
Vérification
chunkfrom leximpact_prepare_data.calib_and_copules import tc
dfv = vaex.open(f"{OUT_PATH}pote_brutes_{year}_*.parquet")tc.assertEqual(len(dfv), 39_264_696) # 39_512_402
tc.assertGreaterEqual(dfv["revkire"].count(), 36644848)
tc.assertGreaterEqual(dfv["revkire"].sum(), 1_084_000_000_000)