diff --git a/Makefile b/Makefile index 628dca0a7..4e8307297 100644 --- a/Makefile +++ b/Makefile @@ -139,9 +139,6 @@ rebuild-importer-tests-compressed-files: rebuild-city-codes: export LBB_ENV=development && cd $(PACKAGE_DIR) && python importer/scripts/clean_csv_city_codes.py - -daily-json-activity-parser: - export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/daily_json_activity_parser.py # Load testing # ------------ @@ -224,6 +221,26 @@ alembic-generate-migration: @echo @echo " $$ alembic revision -m 'create account table'" +# Impact retour à l'emploi +# ------------------------ +daily-json-activity-parser: + export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/daily_json_activity_parser.py + +join_activity_logs_and_dpae: + export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/join_activity_logs_dpae.py + +clean_activity_logs_and_dpae: + export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/clean_activity_logs_dpae.py + +make_report: + export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/make_report.py + +run_ire_jobs: + make join_activity_logs_and_dpae && \ + make clean_activity_logs_and_dpae && \ + make make_report && \ + echo "The new report has been built successfully." + # Importer jobs # ------------- @@ -316,7 +333,3 @@ clean-car-isochrone-and-durations-cache: clean-car-isochrone-cache PATTERN='*durations**car*' $(MAKE) redis-count-keys @echo '###### EXTERMINATE DURATIONS! \######' $(REDIS_DOCKER_COMPOSE) "$(REDIS_CONNECT) --scan --pattern '*durations**car*' | tr '\n' '\0' | xargs -L1 -0 $(REDIS_CONNECT) del" - -delete-unused-redis-containers: - docker ps -f status=restarting -f name=redis --format "{{.ID}}" \ - | xargs docker rm -f \ No newline at end of file diff --git a/labonneboite/scripts/impact_retour_emploi/clean_activity_logs_dpae.py b/labonneboite/scripts/impact_retour_emploi/clean_activity_logs_dpae.py new file mode 100644 index 000000000..45bc0fab8 --- /dev/null +++ b/labonneboite/scripts/impact_retour_emploi/clean_activity_logs_dpae.py @@ -0,0 +1,147 @@ +from datetime import date +import pandas as pd +from labonneboite.importer import util as import_util +from labonneboite.importer import settings as importer_settings +from labonneboite.importer.jobs.common import logger + +def clean_csv_act_dpae_file(DEBUG=False, existing_sql_table=True): + + dpae_folder_path = importer_settings.INPUT_SOURCE_FOLDER + '/' + csv_path = dpae_folder_path+'act_dpae.csv' + + df_dpae_act = pd.read_csv(csv_path, + sep='|', + header=0) + + logger.info("The .csv file generated to clean has {} rows".format(df_dpae_act.shape[0])) + + df_dpae_act = df_dpae_act[df_dpae_act.premiere_embauche == 'Embauche'] + logger.info("The .csv file - rows with not 'premiere embauche' has {} rows".format(df_dpae_act.shape[0])) + + # remove duplicates when multiple activities for the same dpae + df_dpae_act = df_dpae_act.sort_values('dateheure') + df_dpae_act = df_dpae_act.drop_duplicates( + subset=['idutilisateur-peconnect', 'siret'], keep='first') + logger.info("The .csv file - duplicates has {} rows ".format(df_dpae_act.shape[0])) + + # rename some columns + df_dpae_act.rename(columns={'dateheure': 'date_activite', + 'kd_dateembauche': 'date_embauche', + 'nbrjourtravaille': 'duree_activite_cdd_jours', + 'kn_trancheage': 'tranche_age', + 'duree_pec': 'duree_prise_en_charge', + 'dc_commune_id': 'code_postal' + }, + inplace=True) + + + def get_type_contrat(row): + if row['dc_typecontrat_id'] == 1: + return 'CDD' + elif row['dc_typecontrat_id'] == 2: + return 'CDI' + return 'CTT' + df_dpae_act['type_contrat'] = df_dpae_act.apply( + lambda row: get_type_contrat(row), axis=1) + + + def get_nb_mois(row): + return row['duree_activite_cdd_jours'] // 30 + df_dpae_act['duree_activite_cdd_mois'] = df_dpae_act.apply( + lambda row: get_nb_mois(row), axis=1) + + + def get_nbr_jours_act_emb(row): + de = row['date_embauche'][:10].split('-') + da = row['date_activite'][:10].split('-') + f_date = date(int(da[0]), int(da[1]), int(da[2])) + l_date = date(int(de[0]), int(de[1]), int(de[2])) + delta = l_date - f_date + return delta.days + df_dpae_act['diff_activite_embauche_jrs'] = df_dpae_act.apply( + lambda row: get_nbr_jours_act_emb(row), axis=1) + + + def get_priv_pub(row): + if row['dc_privepublic'] == 0: + return 'Public' + return 'Prive' + df_dpae_act['dc_privepublic'] = df_dpae_act.apply( + lambda row: get_priv_pub(row), axis=1) + + + def good_format(row): + return row['date_embauche'][:-2] + df_dpae_act['date_embauche'] = df_dpae_act.apply( + lambda row: good_format(row), axis=1) + + + def del_interrogation(row): + if row['tranche_age'] == 'de 26 ans ? 50 ans': + return 'entre 26 et 50 ans' + return row['tranche_age'] + df_dpae_act['tranche_age'] = df_dpae_act.apply( + lambda row: del_interrogation(row), axis=1) + + + def del_cdd_incoherent(row): + try: + if int(row['duree_activite_cdd_jours']) > 1200: + return 1 + return 0 + except: + return 0 + df_dpae_act['temporaire'] = df_dpae_act.apply( + lambda row: del_cdd_incoherent(row), axis=1) + df_dpae_act = df_dpae_act[df_dpae_act.temporaire == 0] + logger.info("The .csv file - contrats which last too long to be legal has {} rows".format(df_dpae_act.shape[0])) + + # We only have activities in august for 31/08/2018 --> ugly charts, we want to start from the 1st september + df_dpae_act = df_dpae_act[df_dpae_act.date_activite > "2018-08-31"] + logger.info("The .csv file - activity with date = 31/08/2018 has {} rows".format(df_dpae_act.shape[0])) + + cols_of_interest = ['idutilisateur_peconnect', + 'siret', + 'date_activite', + 'date_embauche', + 'type_contrat', + 'duree_activite_cdd_mois', + 'duree_activite_cdd_jours', + 'diff_activite_embauche_jrs', + 'dc_lblprioritede', + 'tranche_age', + 'dc_privepublic', + 'duree_prise_en_charge', + 'dn_tailleetablissement', + 'code_postal'] + + df_dpae_act = df_dpae_act[cols_of_interest] + + engine = import_util.create_sqlalchemy_engine() + + if existing_sql_table: + + query = "select * from act_dpae_clean" + df_dpae_act_existing = pd.read_sql_query(query, engine) + + #In case a problem appear in the script, we save old datas under .csv extension + # because we will rewrite the whole table after each execution, we have to remove duplicates + df_dpae_act_existing.to_csv(dpae_folder_path+'backup_sql_act_dpae_clean', encoding='utf-8', sep='|') + logger.info("There were already act/dpae : {} rows".format(df_dpae_act_existing.shape[0])) + df_dpae_act = pd.concat([df_dpae_act,df_dpae_act_existing]) + logger.info("Concatenation of both has {} rows".format(df_dpae_act.shape[0])) + + df_dpae_act = df_dpae_act.drop_duplicates( + subset=['idutilisateur_peconnect', 'siret'], keep='first') + logger.info("Concatenation of both - duplicates has {} rows".format(df_dpae_act.shape[0])) + + df_dpae_act.to_sql(con=engine, name='act_dpae_clean', + if_exists='replace', index=False, chunksize=10000) + + engine.close() + +def run_main(): + clean_csv_act_dpae_file(existing_sql_table=True) + +if __name__ == '__main__': + run_main() \ No newline at end of file diff --git a/labonneboite/scripts/impact_retour_emploi/clean_tre.py b/labonneboite/scripts/impact_retour_emploi/clean_tre.py deleted file mode 100644 index 24393df68..000000000 --- a/labonneboite/scripts/impact_retour_emploi/clean_tre.py +++ /dev/null @@ -1,400 +0,0 @@ -import urllib -import shutil -from os import makedirs, remove, listdir -from os.path import abspath, exists -from datetime import date -import pandas as pd -import openpyxl -import openpyxl.styles -from sqlalchemy import create_engine -import charts as charts -import fr_charts as fr -import grand_public as gd - -ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' - -# SQL IDPE Connect -engine = create_engine('mysql://labonneboite:%s@127.0.0.1:3306/labonneboite' % - urllib.parse.quote_plus('LaB@nneB@ite')) -engine.connect() - -# For the evolution of number of IDPEC -query = ''' -SELECT count(DISTINCT idutilisateur_peconnect) as count_distinct_idpe, MONTH(dateheure), YEAR(dateheure) -FROM idpe_connect -GROUP BY MONTH(dateheure), YEAR(dateheure) -ORDER BY YEAR(dateheure), MONTH(dateheure); -''' -idpe_connect = pd.read_sql_query(query, engine) -month_idpe = idpe_connect['MONTH(dateheure)'].tolist() -year_idpe = idpe_connect['YEAR(dateheure)'].tolist() - -month_year_idpe = [] # Formatting -i=0 -while i < len(month_idpe): - month_year_idpe.append(str(year_idpe[i])+'/'+str(month_idpe[i])) - i+=1 -idpe_connect['Date'] = month_year_idpe - -# Count all distinct IDPEC -query_bis = ''' -SELECT count(DISTINCT idutilisateur_peconnect) -FROM idpe_connect; -''' -total_idpe_connect = pd.read_sql_query(query_bis, engine) - -# Count only significative activity -query_ter = ''' -SELECT count(DISTINCT idutilisateur_peconnect) -FROM activity_logs; -''' -total_idpe_connect_sign = pd.read_sql_query(query_ter, engine) - - -print('SQL query done !') - -# Creation of all directory needed -path = abspath('clean_tre.py')[:-12] -try: - shutil.rmtree(path+'images/') -except: - pass -try: - shutil.rmtree(path+'gd_pub/') -except: - pass -try: - shutil.rmtree(path+'Clean/') -except: - pass - -makedirs(path+'images/') -makedirs(path+'gd_pub/') -makedirs(path+'Clean/') - - -# Load CSV and rename columns,values etc.. : -act_dpae = pd.read_csv(path+'act_dpae.csv', - sep='|', - header=0) - -# remove duplicates when multiple activities for the same dpae -act_dpae_bis = act_dpae[act_dpae.premiere_embauche == 'Embauche'] -act_dpae_1 = act_dpae_bis.sort_values('dateheure') -act_dpae_2 = act_dpae_1.drop_duplicates( - subset=['idutilisateur-peconnect', 'siret'], keep='first') - -# rename some columns -act_dpae_2.rename(columns={'dateheure': 'date_activite', - 'kd_dateembauche': 'date_embauche', - 'nbrjourtravaille': 'duree_activite_cdd_jours', - 'kn_trancheage': 'tranche_age', - 'duree_pec': 'duree_prise_en_charge', - 'dc_commune_id': 'code_postal' - }, - inplace=True) - - -def get_type_contrat(row): - if row['dc_typecontrat_id'] == 1: - return 'CDD' - elif row['dc_typecontrat_id'] == 2: - return 'CDI' - return 'CTT' - - -act_dpae_2['type_contrat'] = act_dpae_2.apply( - lambda row: get_type_contrat(row), axis=1) - -# TODO : use lambdas functions - - -def get_nb_mois(row): - return row['duree_activite_cdd_jours'] // 30 - - -act_dpae_2['duree_activite_cdd_mois'] = act_dpae_2.apply( - lambda row: get_nb_mois(row), axis=1) - - -def get_nbr_jours_act_emb(row): - de = row['date_embauche'][:10].split('-') - da = row['date_activite'][:10].split('-') - f_date = date(int(da[0]), int(da[1]), int(da[2])) - l_date = date(int(de[0]), int(de[1]), int(de[2])) - delta = l_date - f_date - return delta.days - - -act_dpae_2['diff_activite_embauche_jrs'] = act_dpae_2.apply( - lambda row: get_nbr_jours_act_emb(row), axis=1) - - -def get_priv_pub(row): - if row['dc_privepublic'] == 0: - return 'Public' - return 'Prive' - - -act_dpae_2['dc_privepublic'] = act_dpae_2.apply( - lambda row: get_priv_pub(row), axis=1) - - -def good_format(row): - return row['date_embauche'][:-2] - - -act_dpae_2['date_embauche'] = act_dpae_2.apply( - lambda row: good_format(row), axis=1) - - -def del_interrogation(row): - if row['tranche_age'] == 'de 26 ans ? 50 ans': - return 'entre 26 et 50 ans' - return row['tranche_age'] - - -act_dpae_2['tranche_age'] = act_dpae_2.apply( - lambda row: del_interrogation(row), axis=1) - - -def del_cdd_incoherent(row): - try: - if int(row['duree_activite_cdd_jours']) > 1200: - return 1 - return 0 - except: - return 0 - - -act_dpae_2['temporaire'] = act_dpae_2.apply( - lambda row: del_cdd_incoherent(row), axis=1) -act_dpae_2_bis = act_dpae_2[act_dpae_2.temporaire == 0] - -# We only have activities in august for 31/08/2018 --> ugly charts, we want to start from the 1st september -act_dpae_2_bis = act_dpae_2_bis[act_dpae_2_bis.date_activite > "2018-08-31"] - -cols_of_interest = ['date_activite', - 'date_embauche', - 'type_contrat', - 'duree_activite_cdd_mois', - 'duree_activite_cdd_jours', - 'diff_activite_embauche_jrs', - 'dc_lblprioritede', - 'tranche_age', - 'dc_privepublic', - 'duree_prise_en_charge', - 'dn_tailleetablissement', - 'code_postal'] - -act_dpae_3 = act_dpae_2_bis[cols_of_interest] - - -##### -# CHARTS -##### - -# Names of different legend for Cohortes -all_the_names_1 = ("Total from activity", "Nbre_Total_DPAE", "Mois d'activité", - "Mois d'embauche", "Origine de l'Activité, en fonction du mois d'Embauche") -all_the_names_2 = ('Nbre_Total_DPAE', "Total from activity", "Mois d'embauche", - "Mois d'activité", "Nombres et Mois d'Embauche, en fonction du mois d'Activité") - - -def location(num_image, file_name, link=False): # Pasting of pictures - if link is True: - ws.merge_cells("A21:F21") - ws.merge_cells("G21:L21") - ws.merge_cells("M21:R21") - - num_image += 3 - list_abscisses = ["A", "G", "M"] - if "cohorte" in file_name: - list_abscisses = ["A", "G"] - y = (((num_image//len(list_abscisses)))*20)+1 - x = list_abscisses[num_image % len(list_abscisses)] - return x+str(y) - -# Write Datas and Charts in Excel : - - -# Initialisation -sheet_names = [None, "BoxPlot + Graph", - "Détail Embauches", "Pie Charts", "Map", "Cohortes"] -sheet_sizes = [None, 5, 2, 2, 3] # Number of files per sheet (start at 0) -num_sheet = 1 - -# Writes raw data -wb = openpyxl.Workbook() -wb.save('Temporaire.xlsx') -temporaire_df = pd.ExcelWriter(path+'Temporaire.xlsx', engine='xlsxwriter') -act_dpae_3.to_excel(temporaire_df, 'DPAE', index=False) -temporaire_df.save() -book = openpyxl.load_workbook('Temporaire.xlsx', data_only=True) - -# Extend columns -for i in range(len(act_dpae_3.columns.tolist())): - book.active.column_dimensions[ALPHABET[i]].width = 20 - -# Past of graphics/maps/Pie etc... -book.create_sheet(sheet_names[num_sheet]) -ws = book.worksheets[num_sheet] -dict_charts = {('01', "diff_activite_embauche_jrs", "Nombre de Jours entre l'activite sur lbb et la DPAE", "act_emb", 1): charts.BoxPlot, - ('02', 'duree_activite_cdd_jours', "Durée du CDD obtenu", "cdd_duree", 0): charts.BoxPlot, - ('03', 'count_distinct_idpe', "Nbre d'IDPE connect par mois", "idpe_connect", 'Date'): charts.graph_sql, - ('04', 'date_activite', "Nombre d'activités entrainant une DPAE par mois", "act_mo", "month"): charts.Graph, - ('05', 'date_embauche', "Nombre d'embauches par mois", "emb_mo_gd_public_graph", 'month'): charts.Graph, - ('07', 'date_embauche', " : Nombre d'embauches par semaine", "emb_sem", 'week'): charts.Graph, - ('08', "type_contrat", "Type de contrat obtenu", "type_cont_gd_public_pie", None): charts.Pie, - ('10', "tranche_age", "Pourcentage des differentes tranches d'ages dans les DPAE", "age_gd_public_pie", None): charts.Pie, - ('11', "dc_privepublic", "Pourcentage d'embauche dans le privé et dans le public", "prive_pub_gd_public_pie", None): charts.Pie, - ('12', "code_postal", "Part des DPAE par anciennes régions", "old_region_gd_public_svg", "old_region"): fr.map_fr, - ('13', "code_postal", "Part des DPAE par nouvelles régions", "new_region_gd_public_svg", "new_region"): fr.map_fr, - ('14', "code_postal", "Part des DPAE par département", "dep_gd_public_svg", "departement"): fr.map_fr, - ('15', 'date_embauche', all_the_names_1, 'cohorte_1_gd_public', 'date_activite'): charts.Stacked_Bar, - ('16', 'date_activite', all_the_names_2, 'cohorte_2_gd_public', 'date_embauche'): charts.Stacked_Bar} - - -####Boucle ? ################################################################################# -# Add number of DPAE with LBB activity -nbre_DPAE = act_dpae_3['date_activite'].describe().tolist()[0] -ws.merge_cells('A4:F4') -ws.merge_cells('A5:F5') -cell_A4 = ws['A4'] -cell_A4.value = 'Nbre de DPAE ayant pour origine une activité sur LBB' -cell_A4.font = openpyxl.styles.Font(size=10, bold=True, underline='double') -cell_A4.alignment = openpyxl.styles.Alignment(horizontal="center") -cell_A5 = ws['A5'] -cell_A5.value = nbre_DPAE -cell_A5.font = openpyxl.styles.Font(size=10, italic=True) -cell_A5.alignment = openpyxl.styles.Alignment(horizontal="center") -DPAE_for_gd_pub = [cell_A4.value, nbre_DPAE] # for grand_public - -# Add number of IDPE unique -nbre_DPAE = total_idpe_connect.loc[0][0] -ws.merge_cells('A7:F7') -ws.merge_cells('A8:F8') -cell_A7 = ws['A7'] -cell_A7.value = "Nbre d'IDPE connect unique sur LBB depuis 09/18" -cell_A7.font = openpyxl.styles.Font(size=10, bold=True, underline='double') -cell_A7.alignment = openpyxl.styles.Alignment(horizontal="center") -cell_A8 = ws['A8'] -cell_A8.value = nbre_DPAE -cell_A8.font = openpyxl.styles.Font(size=10, italic=True) -cell_A8.alignment = openpyxl.styles.Alignment(horizontal="center") -IDPE_for_gd_pub = [cell_A7.value, nbre_DPAE] # for grand_public - -# Add number of IDPE unique with significative activity -nbre_IDPE_sign = total_idpe_connect_sign.loc[0][0] -ws.merge_cells('A10:F10') -ws.merge_cells('A11:F11') -ws.merge_cells('A12:F12') -cell_A10 = ws['A10'] -cell_A10.value = "Nbre d'IDPE connect unique sur LBB, ayant cliqué sur:" -cell_A10.font = openpyxl.styles.Font(size=10, bold=True, underline='double') -cell_A10.alignment = openpyxl.styles.Alignment(horizontal="center") -cell_A11 = ws['A11'] -cell_A11.value = "'Favoris','Telecharger PDF','Details d'une entreprise' " -cell_A11.font = openpyxl.styles.Font(size=10, bold=True, underline='double') -cell_A11.alignment = openpyxl.styles.Alignment(horizontal="center") -cell_A12 = ws['A12'] -cell_A12.value = nbre_IDPE_sign -cell_A12.font = openpyxl.styles.Font(size=10, italic=True) -cell_A12.alignment = openpyxl.styles.Alignment(horizontal="center") -IDPE_sign_for_gd_pub = [cell_A10.value, - nbre_IDPE_sign, cell_A11.value] # for grand_public - -################################################################################################## - -num_im = 1 -package_svg = [] -all_stats = [] - -for args in dict_charts: # Creation and saving of charts, using charts.py - if 'sql' in dict_charts[args].__name__: # choose data - data = idpe_connect - else: - data = act_dpae_3 - - # Creation of charts/maps in directory "images/" - image = dict_charts[args]( - args[0], args[1], data, args[2], args[3], args[4]) # function - - # if sem in args, function return the number of graph by "week". It means that create a new sheet is necessary - # if sem not in args AND image is not None, it means that the function return a list of stats (for gd_pub sheet) - if "sem" in args[3]: - sheet_sizes.insert(2, image-1) - elif image is not None: - all_stats.append(image) - -# Iterate through the created images -# Pasting of charts from the directory -for filename in sorted(listdir(path+'images/')): - - img = openpyxl.drawing.image.Image(path+'images/'+filename) - - if "gd_public" in filename: - shutil.copyfile(path+'images/'+filename, path+'gd_pub/'+filename) - - if "table" in filename: # it's the table of cohorte --> it's a different size - img.anchor = 'H1' - img.height = 750 - img.width = 900 - else: - # using the function location in order to place the charts - img.anchor = location(num_im, filename) - img.height = 400 - img.width = 500 - - ws.add_image(img) # Pasting - - # if it's map --> pasting web link below charts - if exists(path+filename[:-3]+'svg'): - cells_link = ws[location(num_im, filename, True)] - cells_link.hyperlink = filename[:-3]+'svg' - cells_link.font = openpyxl.styles.Font( - size=5.5, italic=True, underline='single') - cells_link.alignment = openpyxl.styles.Alignment(horizontal="center") - package_svg.append((path, filename[:-3]+'svg')) - - num_im += 1 - - # if it's the last charts of the sheet --> change sheet - if num_im == (sheet_sizes[num_sheet]+1): - try: - num_sheet += 1 - book.create_sheet(sheet_names[num_sheet]) - ws = book.worksheets[num_sheet] - num_im = 0 - except: - pass - -book.save('Impact_lbb_DPAE.xlsx') - -# gd_pub sheet -gd.build_grand_public_sheet(DPAE_for_gd_pub, - IDPE_for_gd_pub, - IDPE_sign_for_gd_pub, - all_stats, - book, - path+'gd_pub/') - -# Remove all files/directory useless and create "Clean" package -shutil.rmtree(path+'images/') -shutil.rmtree(path+'gd_pub/') -remove("Temporaire.xlsx") -shutil.copyfile(path+'Impact_lbb_DPAE.xlsx', path + - 'Clean/'+'Impact_lbb_DPAE.xlsx') -for path, svg in package_svg: - shutil.copyfile(path+svg, path+'Clean/'+svg) -remove("table.html") -for last_files in listdir(path): - try: - extension = last_files[last_files.index('.'):] - if extension == '.svg' or extension == '.xlsx': - remove(last_files) - except: - pass # It's a directory - -def run_main(): - return 0 \ No newline at end of file diff --git a/labonneboite/scripts/impact_retour_emploi/join_activity_logs_dpae.py b/labonneboite/scripts/impact_retour_emploi/join_activity_logs_dpae.py new file mode 100644 index 000000000..f554cbafd --- /dev/null +++ b/labonneboite/scripts/impact_retour_emploi/join_activity_logs_dpae.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- + +import os +import urllib +import pandas as pd +from labonneboite.importer import util as import_util +from labonneboite.importer import settings as importer_settings +from labonneboite.importer.jobs.common import logger + +# TODO : To improve datas about job sector, to have more informations : use code NAF --> Luc Caffier + +DEBUG = True + +def get_activity_logs(): + engine = import_util.create_sqlalchemy_engine() + + query = "select * from activity_logs" + if DEBUG: + query += " ORDER BY RAND() LIMIT 1000000" + df_activity = pd.read_sql_query(query, engine) + + engine.close() + + # TODO : Défninir une durée pour laquelle on considère qu'une activité sur LBB n'a pas d'impacts sur le retour à l'emploi + # Cela permettra de ne pas recharger tous les logs d'activité à chaque fois, mais uniquement sur une certaine période + # https://valodata.slack.com/archives/C0QR8RYL8/p1562319224015200 + # df_activity = df_activity[df_activity.dateheure > duree_activity_not_prise_en_compte] + + logger.info('Activities logs are loaded') + + return df_activity + +def join_dpae_activity_logs(df_activity): + # function used to create a new column from dateheure column in dpae + def get_date(row): + return row['kd_dateembauche'][:10] + + dpae_folder_path = importer_settings.INPUT_SOURCE_FOLDER+'/' + dpae_paths = os.listdir(dpae_folder_path) + dpae_paths = [i for i in dpae_paths if i.startswith('LBB_XDPDPAE')] + + dpae_paths.sort() + most_recent_dpae_file = dpae_paths[-1] + + logger.info("the DPAE file which will be used is : {}".format(most_recent_dpae_file)) + + #We select the last DPAE date that has been used in the last joined dpae + engine = import_util.create_sqlalchemy_engine() + + query = "select date_embauche from act_dpae_clean order by date_embauche DESC LIMIT 1 " + row = engine.execute(query).fetchone() + date_last_recorded_activity = row[0].split()[0] + + logger.info("the most recent date found is {} ".format(date_last_recorded_activity)) + + engine.close() + + # We use the datas in csv using smaller chunks + if DEBUG: + chunksize = 10 ** 5 + else: + chunksize = 10 ** 6 + i = 0 + + column_names = ['kc_siret', 'dc_naf_id', 'dc_adresse', 'dc_codepostal', '_', + 'dn_tailleetablissement', 'dc_communenaissancepays', 'kd_dateembauche', + 'dc_typecontrat_id', 'dd_datefincdd', 'dc_romev3_1_id', 'dc_romev3_2_id', + 'kd_datecreation', 'dc_privepublic', 'dc_commune_id', 'dc_natureemploi_id', + 'dc_qualitesalarie_id', 'dn_dureetravailhebdo', 'dn_dureetravailmensuelle', + 'dn_dureetravailannuelle', 'nbrjourtravaille', 'iiann', 'dc_lblprioritede', + 'kn_trancheage', 'duree_pec', 'dc_ididentiteexterne', 'premiere_embauche'] + + total_rows_kept = 0 + + for df_dpae in pd.read_csv(dpae_folder_path+most_recent_dpae_file, + header=None, + compression='bz2', + names=column_names, + sep='|', + index_col=False, + chunksize=chunksize): + + logger.info("Sample of DPAE has : {} rows".format(df_dpae.shape[0])) + + # remove rows where the data is > to the 11 10 2018 (dont have activity dates after this) + df_dpae['kd_dateembauche_bis'] = df_dpae.apply( + lambda row: get_date(row), axis=1) + df_dpae = df_dpae[df_dpae.kd_dateembauche_bis > date_last_recorded_activity] + logger.info("Sample of DPAE minus old dates has : {} rows".format(df_dpae.shape[0])) + + # convert df dpae columns to 'object' + df_dpae = df_dpae.astype(str) + + df_dpae_act = pd.merge(df_dpae, + df_activity, + how='left', + left_on=['dc_ididentiteexterne', 'kc_siret'], + right_on=['idutilisateur_peconnect', 'siret']) + logger.info("Sample of merged activity/DPAE has : {} rows".format(df_dpae_act.shape[0])) + + # filter on the fact that dateheure activity must be inferior to kd_dateembauche + df_dpae_act = df_dpae_act[df_dpae_act.kd_dateembauche_bis > df_dpae_act.dateheure] + df_dpae_act = df_dpae_act.drop(['kd_dateembauche_bis'], axis=1) + logger.info("Sample of merged activity/DPAE with the good dates has : {} rows".format(df_dpae_act.shape[0])) + + path_to_csv = dpae_folder_path+'act_dpae.csv' + exists = os.path.isfile(path_to_csv) + + #We want to rewrite the CSV file after each new execution of DPAE extraction + if exists and i == 0: + os.remove(path_to_csv) + df_dpae_act.to_csv(path_to_csv, encoding='utf-8', sep='|') + else: + with open(path_to_csv, 'a') as f: + df_dpae_act.to_csv(f, header=False, sep='|') + + total_rows_kept += df_dpae_act.shape[0] + logger.info(" --> Nb rows we keep in this sample : {} rows".format(df_dpae_act.shape[0])) + logger.info(" --> Nb total rows that have been kept : {} rows".format(total_rows_kept)) + logger.info("-------------------------") + i += 1 + + #In debug mode, we stop parsing the DPAE file when we reach (10 ** 5) * 20 = 2 000 000 lines + if DEBUG and i == 20: + break + + return path_to_csv + +def run_main(): + df_activity = get_activity_logs() + join_dpae_activity_logs(df_activity) + +if __name__ == '__main__': + run_main() diff --git a/labonneboite/scripts/impact_retour_emploi/tre_dpae.py b/labonneboite/scripts/impact_retour_emploi/tre_dpae.py deleted file mode 100644 index b12ff9782..000000000 --- a/labonneboite/scripts/impact_retour_emploi/tre_dpae.py +++ /dev/null @@ -1,95 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -import urllib -import pandas as pd -from sqlalchemy import create_engine - -# TODO : To add datas about job sector, use code NAF --> Luc Caffier - -start = 0 -column_names = ['kc_siret', 'dc_naf_id', 'dc_adresse', 'dc_codepostal', '_', - 'dn_tailleetablissement', 'dc_communenaissancepays', 'kd_dateembauche', - 'dc_typecontrat_id', 'dd_datefincdd', 'dc_romev3_1_id', 'dc_romev3_2_id', - 'kd_datecreation', 'dc_privepublic', 'dc_commune_id', 'dc_natureemploi_id', - 'dc_qualitesalarie_id', 'dn_dureetravailhebdo', 'dn_dureetravailmensuelle', - 'dn_dureetravailannuelle', 'nbrjourtravaille', 'iiann', 'dc_lblprioritede', - 'kn_trancheage', 'duree_pec', 'dc_ididentiteexterne', 'premiere_embauche'] - - -# function used to create a new column from dateheure column in dpae -def get_date(row): - return row['kd_dateembauche'][:10] - - -# Get the datas from lab_tre_activity in the mysql of lbbdev -engine = create_engine('mysql://labonneboite:%s@127.0.0.1:3306/labonneboite' % - urllib.parse.quote_plus('LaB@nneB@ite')) -engine.connect() -query = "select * from activity_logs" -df_activity = pd.read_sql_query(query, engine) -''' -df_activity = pd.read_csv('/home/jenkins/tre2/activity_logs.csv') -df_activity_2 = df_activity.astype(str) -''' -df_activity_2 = df_activity[df_activity.dateheure > "2018-08-31"] - - -print('query SQL OK') - -liste_files_dpae = os.listdir('/srv/lbb/data') -liste_files_good = [] -for bz2 in liste_files_dpae: - if 'XDPDPAE' in bz2: - liste_files_good.append(bz2) -liste_files_good.sort() -bz = liste_files_good[-1] -# We use the datas in csv using smaller chunks -chunksize = 10 ** 6 -i = 1 -for df_dpae in pd.read_csv('/srv/lbb/data/'+bz, - header=None, - compression='bz2', - names=column_names, - sep='|', - index_col=False, - chunksize=chunksize): - - # remove rows where the data is > to the 11 10 2018 (dont have activity dates after this) - df_dpae['kd_dateembauche_bis'] = df_dpae.apply( - lambda row: get_date(row), axis=1) - # TODO: - df_dpae_2 = df_dpae[df_dpae.kd_dateembauche_bis > "2018-08-31"] - - # convert df dpae columns to 'object' - df_dpae_3 = df_dpae_2.astype(str) - - # TODO : Test entre left & inner - new_df = pd.merge(df_dpae_3, - df_activity_2, - how='left', # 'left' - left_on=['dc_ididentiteexterne', 'kc_siret'], - right_on=['idutilisateur_peconnect', 'siret']) - - new_df_2 = new_df[pd.notnull(new_df['idutilisateur_peconnect'])] - - # filter on the fact that dateheure activity must be inferior to kd_dateembauche - new_df_3 = new_df_2[new_df_2.kd_dateembauche_bis > new_df_2.dateheure] - new_df_4 = new_df_3.drop(['kd_dateembauche_bis'], axis=1) - - # TODO : Compare documents in both folders, - # y'a /srv/lbb/data/LBB_XDPDPAE_2019-04-14_2018-03-01.bz2ptet eu un chmilblik - path = '/home/jenkins/tre2/' - path_to_csv = path+'act_dpae.csv' - exists = os.path.isfile(path_to_csv) - if exists: - with open(path_to_csv, 'a') as f: - new_df_4.to_csv(f, header=False, sep='|') - else: - new_df_4.to_csv(path_to_csv, encoding='utf-8', sep='|') - - print('nb rows : ', i * chunksize) - i += 1 - -def run_main(): - return 0 diff --git a/setup.py b/setup.py index ff6ceeeea..cc82ee2f2 100644 --- a/setup.py +++ b/setup.py @@ -28,8 +28,9 @@ def read(fname): 'populate_flags = labonneboite.importer.jobs.populate_flags:run_main', 'update_lbb_data = labonneboite.importer.importer:run', 'daily_json_activity_parser = labonneboite.scripts.impact_retour_emploi.daily_json_activity_parser:run_main', - 'tre_dpae = labonneboite.scripts.impact_retour_emploi.tre_dpae:run_main', - 'clean_tre = labonneboite.scripts.impact_retour_emploi.clean_tre:run_main' + 'join_activity_logs_dpae = labonneboite.scripts.impact_retour_emploi.join_activity_logs_dpae:run_main', + 'clean_activity_logs_dpae = labonneboite.scripts.impact_retour_emploi.clean_activity_logs_dpae:run_main', + 'make_report = labonneboite.scripts.impact_retour_emploi.make_report:run_main' ], }, classifiers=[