CASD : Extraction de quantiles de POTE

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
year = "2019"
# year = "2019"
# year = "2018"
OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\agregats\data/"
ARROW_PATH = OUT_PATH + "assiettes_pote_brutes_" + year + r"-chunk/"
taille_chunk = 2 * 2**20  # 2**20 = 1_048_576
# taille_chunk = 5000
import leximpact_prepare_data

leximpact_prepare_data.__version__
'0.0.17'
import gc
import json

import vaex
from tqdm import tqdm

from leximpact_prepare_data.scenario_tools.calib_and_copules import *
# Temps de chargement 8 secondes pour 39,264,695 lignes, vive le lazy loading !
dfv = vaex.open(ARROW_PATH + "*")
# dfv = vaex.open(ARROW_PATH + "pote_brutes_2019_5.arrow")
# dfv
tc.assertEqual(len(dfv), 39264696)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
File <timed exec>:5, in <module>

File C:\Users\Public\Documents\Anaconda\envs\leximpa\lib\unittest\case.py:837, in TestCase.assertEqual(self, first, second, msg)
    833 """Fail if the two objects are unequal as determined by the '=='
    834    operator.
    835 """
    836 assertion_func = self._getAssertEqualityFunc(first, second)
--> 837 assertion_func(first, second, msg=msg)

File C:\Users\Public\Documents\Anaconda\envs\leximpa\lib\unittest\case.py:830, in TestCase._baseAssertEqual(self, first, second, msg)
    828 standardMsg = '%s != %s' % _common_shorten_repr(first, second)
    829 msg = self._formatMessage(msg, standardMsg)
--> 830 raise self.failureException(msg)

AssertionError: 39818227 != 39264696
# Temps d'exécution : 2 secondes
# pyramide_des_ages = dfv.groupby(by="aged", agg={"age": vaex.agg.count("aged")})
# pyramide_des_ages
CPU times: total: 0 ns
Wall time: 0 ns
# dfv.info()
dfv.get_column_names()
['mat',
 'aged',
 'agec',
 'zf',
 'zp',
 'zn',
 'stutile',
 'f',
 'clirpg',
 'g',
 'r',
 'j',
 'h',
 'i',
 'p',
 'nbefi',
 'nbfoy',
 'nbpldm',
 'rimp',
 'rnirp8',
 'rbg',
 'mnrvr3',
 'txmoy',
 'revkire',
 'z1ak',
 'z1az',
 'z1bk',
 'z1bz',
 'z8uy',
 'mnipeg',
 'mnrvi2',
 'mnrvk',
 'mnrvni',
 'rnimeh',
 'rnirai',
 'rnirdu',
 'rnirgi',
 'frf',
 'impotnet',
 'impot',
 'nbpart',
 'fip18_c',
 'f+h']
"f g h i r p".split(" ")
['f', 'g', 'h', 'i', 'r', 'p']

Variables continues

# "Z1ak Z1bk txmoy impot impotnet rnirp8 rnimeh tsirna mnipeg rnirai rnirdu rnirgi Z1az Z1bz".split(" ")
# continuous_variables = [
# "mnipeg", Toujours à 0
# "rnirp8",
# "rnimeh",
# "rnirai",
# "rnirdu",
# "rnirgi",
# "Z1az",
# "Z1bz",
# "rimp",
# "rbg",
# "mnrvr3",
# "revkire",
# "Z1ak",
# "Z1bk",
# "Z8uy",
# "MNRVI2",
# "MNRVK",
# "MNRVNI",
# "FRF",
# "Z1ak",
# "Z1bk",
# "txmoy",
# "impot",
# "impotnet",
# ]
# continuous_variables = [c.lower() for c in continuous_variables]

Calcul des quantiles

def compute_quantile(vdf, columns=None, quantiles=10):
    vdf.fillna(column_names=columns, value=0, inplace=True)
    # vdf.fillnan(column_names=columns, value=0, inplace=True)
    vdf.shape[0]
    columns = columns if columns else vdf.get_column_names()
    for col in tqdm(columns):
        try:
            # print(col)
            q = Quantile(vdf[col].tolist())
            for quantile in quantiles:
                q_dict = q.get_quantile(quantile)
                with open(
                    f"{OUT_PATH}/quantile_POTE_{quantile}_{year}_{col}.json", "w"
                ) as f:
                    f.write(json.dumps(q_dict))
            del q
            gc.collect()
        except Exception as e:
            print(f"ERROR processing {col} {e.__class__.__name__} : {e}")
            continue
# Temps sur CASD : 5 minutes par colonne
compute_quantile(dfv, quantiles=[10, 100])
100%|███████████████████████████████████████| 23/23 [1:50:32<00:00, 288.38s/it]
CPU times: total: 1h 49min 50s
Wall time: 1h 50min 32s
del dfv
gc.collect()
469