ETL_Infra package¶
ETL_Infra.etl_process module¶
ETL Infra
- ETL_Infra.etl_process.create_train_signal(work_dir, old_train_path=None)[source]¶
_summary_
- Parameters:
work_dir (str) – Working directory where the FinalSignals are stored.
old_train_path (pd.DataFrame | None, optional) – if given a dataframe with “pid” and “val” of old train value. Defaults to None.
- ETL_Infra.etl_process.finish_prepare_load(workdir, dest_folder, dest_rep, to_remove=[], load_only=[], override=False)[source]¶
Finalize and prepare Flow load command to execute to complete the load
- Parameters:
workdir (str) – The working directory where we stored all the ETL outputs
dest_folder (str) – The path to create the final repository. A Directory
dest_rep (str) – The name of the repository, to contorl the name of NAME.repository file
to_remove (List[str]) – optional list of signals to skip from loading
load_only (List[str]) – optional list of signals to load only if exist
override (bool) – If true will override all process
- Return type:
None
- ETL_Infra.etl_process.generate_labs_mapping_and_units_config(df, samples_per_signal=5)[source]¶
Creates a config table for signal+unit under CODE_DIR/config/map_units_stats.cfg
- Parameters:
df (DataFrame) – the dataframe to process. have columns : signal, unit
samples_per_signal (int) – how many example values to fecth for each signal+unit combination
- Return type:
None
- ETL_Infra.etl_process.map_and_fix_units(df)[source]¶
Uses the configuration file of the units to convert the units
- Parameters:
df (DataFrame) – the dataframe to process. have columns : signal, unit
- Return type:
DataFrame
- ETL_Infra.etl_process.prepare_dicts(workdir, signal, def_dict=None, set_dict=None, add_missing_codes=True)[source]¶
Main for preparing dicts - workdir, signal, 2 additional argument to add cusom client dicts for this signal
- Parameters:
workdir (str) – The working directory where we stored all the ETL outputs
signal (str) – The name of the signal or several signals with comma seperated
def_dict (DataFrame | None) – Optional DataFrame with 2 columns, the first column is the internal code and the value we used in the loading files. The second column is the description of this code that we will be able to see it next to the internla code or query the code by the description
set_dict (DataFrame | None) – Optional if we have sets inside the client dictionary.
add_missing_codes (bool) – If true will try to “trim” long codes to shorter ones and search for matching
- Todo:
change signal to List
- Return type:
None
- ETL_Infra.etl_process.prepare_final_signals(data, workdir, sigs, batch_size=0, start_write_batch=None, skip_batch_tests=False, editor=None, map_editor=None, override='n', interactive=False)[source]¶
The main function to generate FinalSignals using batches of DataFrames.
The code will do the following for each batch:
When pid is not numeric a conversion is needed. If the signal is part of “demographic”
it will creating a mappsing file from the string to numeric number. If the signal is not demographic,
it will use the mapping to convert it to numeric. The mapping will be stored inside workdir/FinalSignals/ID2NR
If we have “signals” column and the signal is “unknown” it will try to use “config/map.tsv”
to map the signal to existing signal.
Running The suitable processing unit from CODE_DIR/signal_processings/$SIGNAL_NAME_OR_PROCESSING_UNIT_NAME_OR_TAG.py
The most specific code will be called. There is a logic like class inheritance.
If my signal is Hemoglobin, it is also tagged and “cbc” and “labs”.
It will first look for “Hemoglobin.py”, then “cbc.py” and then “labs.py”.
If the code file is missing/directory doesn’t exist - the code will create the directory and a file with instructions
The signal will be determined by “signal” column if not Null, otherwise it will use the “sigs” parameter.
Testing the result dataframe - first signal format testing, time channel is integer and in valid dates, value is numeric
Categorical values doens’t contains invalid characters, etc. Then deeper tests that can be expended based on the signal labels
Will be executes, like testing the value range.
Sorting, and storing the file in the right place under - WORK_DIR/FinalSignals
- Parameters:
data (Callable[[int, int], Generator[DataFrame, None, None]] | DataFrame) – A lazy iterator of DataFrame, the only constraint is to have “pid” column. To use this iterator, we need to specify 2 integers - batch size and starting batch position.
workdir (str) – The working directory where we stored all the ETL outputs
sigs (str) – comma separeted names of logic units to execute on each DataFrame on records without “signal” value
batch_size (int) – A parameter to control batch size of the lazy iterator to pass to it
start_write_batch (int | None) – If multiple data sources wants to generate the same “signal” we wants to avoid override in FinalSignals. This is our way tp handle this. To give each call od prepare_final_signals a different “batch” number for writing the output
skip_batch_tests (bool) – This controls if to skip tests in between batches
override (Literal['y', 'n']) – If “y” will redo again and will not take into account current status. The default is “n” - no. To use current state, and to skip completed batches or skip all this processing if all batches are completed.
editor (str | None)
map_editor (str | None)
interactive (bool)
- Returns:
True is finished succesfully.
- Return type:
bool
ETL_Infra.plot_graph module¶
- plot_graph.plot_graph(obj, save_path, title='Test', mode='markers+lines', javascript_path='W:\\Graph_Infra\\plotly-latest.min.js')[source]¶
Method to plot graph using plotly
- Parameters:
obj (Dict[str, DataFrame] | DataFrame) – dictioanry from the name of the series into Dataframe with the data. The first column is x, 2nd is y axis data
save_path (str) – path to store the html file
title (str) – The graph title
mode (str) – controls the graph type: “bar” or options for scatter
javascript_path (str) – controls the path to javascript
- Return type:
None
ETL_Infra.etl_unit_conversions module¶
- class ETL_Infra.unit_conversions.SuggestionResult(direction, _bias, _factor, _distance, _desc, _target, _grp_size, _current_median, _expected_median)[source]¶
Bases:
objectA class that holds unit suggestion linear transformation
- Parameters:
direction (bool)
_bias (float)
_factor (float)
_distance (float)
_desc (str)
_target (str)
_grp_size (int)
_current_median (float)
_expected_median (float)
- bias: float¶
- current_median: float¶
- description: str¶
- distance: float¶
- expected_median: float¶
- factor: float¶
- group_size: int¶
- opposite_transformation: bool¶
- target_unit: str¶
- ETL_Infra.unit_conversions.find_best_unit_suggestion(df, allow_op=False, diff_threshold_percentage=0.5, diff_threshold_ratio=3, min_grp_size=500)[source]¶
Function that recieves dataframe with multiple signals, units and breaks down results by each group of signal,unit to rank of most suitable units linear transformation [bias + factor] for this group. IT will also return the distance from expected median value of target unit
- Parameters:
df (DataFrame) – DataFrame input with signal, value_0 column
allow_op (bool) – If true will allow to test also the opposite transformation of the linear suggestion.
diff_threshold_percentage (float) – How much in percentage [0-1] difference from best match continue to suggest other options
min_grp_size (int) – Group size that below this value we will ignore
diff_threshold_ratio (float)
- Return type:
Dict[Tuple[str, str] | str, List[SuggestionResult]]
- ETL_Infra.unit_conversions.find_best_unit_suggestion_to_group(df, optional_factors=None, allow_op=False, diff_threshold_percentage=0.5, diff_threshold_ratio=3, signal_value_stats_df=None, unit_suggestion_cfg=None, signal_info=None, group_name=None)[source]¶
Function that recieve dataframe with raw value and optional tranformation Tuple [bias, factor] and chooses best
- Parameters:
df (DataFrame) – DataFrame input with signal, value_0 column
optional_factors (List[Tuple[float, float, str]] | None) – List of options for linear tranformations - Each tuple is based on [bias, factor]
allow_op (bool) – If true will allow to test also the opposite transformation of the linear suggestion.
diff_threshold_percentage (float) – How much in percentage [0-1] difference from best match continue to suggest other options
signal_value_stats_df (DataFrame | None) – A dataframe with signal and percentile stats - median value. If not given will read again from resources directroy.
unit_suggestion_cfg (DataFrame | None) – A dataframe with signal unit suggestion transormations for each signal,unit. If not given will read again from resources directroy.
signal_info (Dict[str, SignalInfo] | None) – An object that holds signals information, what is the target unit. If None, will read from resource directory
diff_threshold_ratio (float)
group_name (tuple[str, str] | None)
- Return type:
List[SuggestionResult] | None