from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"CASD : Extraction de quantiles de POTE
year = "2019"
# year = "2019"
# year = "2018"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data/"
ARROW_PATH = OUT_PATH + "assiettes_pote_brutes_" + year + r"-chunk/"
taille_chunk = 2 * 2**20 # 2**20 = 1_048_576
# taille_chunk = 5000import leximpact_prepare_data
leximpact_prepare_data.__version__'0.0.17'
import gc
import json
import vaex
from tqdm import tqdm
from leximpact_prepare_data.scenario_tools.calib_and_copules import *# Temps de chargement 8 secondes pour 39,264,695 lignes, vive le lazy loading !
dfv = vaex.open(ARROW_PATH + "*")
# dfv = vaex.open(ARROW_PATH + "pote_brutes_2019_5.arrow")
# dfv
tc.assertEqual(len(dfv), 39264696)AssertionError: 39818227 != 39264696
# Temps d'exécution : 2 secondes
# pyramide_des_ages = dfv.groupby(by="aged", agg={"age": vaex.agg.count("aged")})
# pyramide_des_agesCPU times: total: 0 ns
Wall time: 0 ns
# dfv.info()dfv.get_column_names()['mat',
'aged',
'agec',
'zf',
'zp',
'zn',
'stutile',
'f',
'clirpg',
'g',
'r',
'j',
'h',
'i',
'p',
'nbefi',
'nbfoy',
'nbpldm',
'rimp',
'rnirp8',
'rbg',
'mnrvr3',
'txmoy',
'revkire',
'z1ak',
'z1az',
'z1bk',
'z1bz',
'z8uy',
'mnipeg',
'mnrvi2',
'mnrvk',
'mnrvni',
'rnimeh',
'rnirai',
'rnirdu',
'rnirgi',
'frf',
'impotnet',
'impot',
'nbpart',
'fip18_c',
'f+h']
"f g h i r p".split(" ")['f', 'g', 'h', 'i', 'r', 'p']
Variables continues
# "Z1ak Z1bk txmoy impot impotnet rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi Z1az Z1bz".split(" ")# continuous_variables = [
# "mnipeg", Toujours à 0
# "rnirp8",
# "rnimeh",
# "rnirai",
# "rnirdu",
# "rnirgi",
# "Z1az",
# "Z1bz",
# "rimp",
# "rbg",
# "mnrvr3",
# "revkire",
# "Z1ak",
# "Z1bk",
# "Z8uy",
# "MNRVI2",
# "MNRVK",
# "MNRVNI",
# "FRF",
# "Z1ak",
# "Z1bk",
# "txmoy",
# "impot",
# "impotnet",
# ]
# continuous_variables = [c.lower() for c in continuous_variables]Calcul des quantiles
def compute_quantile(vdf, columns=None, quantiles=10):
vdf.fillna(column_names=columns, value=0, inplace=True)
# vdf.fillnan(column_names=columns, value=0, inplace=True)
vdf.shape[0]
columns = columns if columns else vdf.get_column_names()
for col in tqdm(columns):
try:
# print(col)
q = Quantile(vdf[col].tolist())
for quantile in quantiles:
q_dict = q.get_quantile(quantile)
with open(
f"{OUT_PATH}/quantile_POTE_{quantile}_{year}_{col}.json", "w"
) as f:
f.write(json.dumps(q_dict))
del q
gc.collect()
except Exception as e:
print(f"ERROR processing {col} {e.__class__.__name__} : {e}")
continue# Temps sur CASD : 5 minutes par colonne
compute_quantile(dfv, quantiles=[10, 100])100%|███████████████████████████████████████| 23/23 [1:50:32<00:00, 288.38s/it]
CPU times: total: 1h 49min 50s
Wall time: 1h 50min 32s
del dfv
gc.collect()469