ETL_Infra package

ETL_Infra.etl_process module

ETL Infra

ETL_Infra.etl_process.create_train_signal(work_dir, old_train_path=None)[source]

_summary_

Parameters:
  • work_dir (str) – Working directory where the FinalSignals are stored.

  • old_train_path (pd.DataFrame | None, optional) – if given a dataframe with “pid” and “val” of old train value. Defaults to None.

ETL_Infra.etl_process.finish_prepare_load(workdir, dest_folder, dest_rep, to_remove=[], load_only=[], override=False)[source]

Finalize and prepare Flow load command to execute to complete the load

Parameters:
  • workdir (str) – The working directory where we stored all the ETL outputs

  • dest_folder (str) – The path to create the final repository. A Directory

  • dest_rep (str) – The name of the repository, to contorl the name of NAME.repository file

  • to_remove (List[str]) – optional list of signals to skip from loading

  • load_only (List[str]) – optional list of signals to load only if exist

  • override (bool) – If true will override all process

Return type:

None

ETL_Infra.etl_process.generate_labs_mapping_and_units_config(df, samples_per_signal=5)[source]

Creates a config table for signal+unit under CODE_DIR/config/map_units_stats.cfg

Parameters:
  • df (DataFrame) – the dataframe to process. have columns : signal, unit

  • samples_per_signal (int) – how many example values to fecth for each signal+unit combination

Return type:

None

ETL_Infra.etl_process.map_and_fix_units(df)[source]

Uses the configuration file of the units to convert the units

Parameters:

df (DataFrame) – the dataframe to process. have columns : signal, unit

Return type:

DataFrame

ETL_Infra.etl_process.prepare_dicts(workdir, signal, def_dict=None, set_dict=None, add_missing_codes=True)[source]

Main for preparing dicts - workdir, signal, 2 additional argument to add cusom client dicts for this signal

Parameters:
  • workdir (str) – The working directory where we stored all the ETL outputs

  • signal (str) – The name of the signal or several signals with comma seperated

  • def_dict (DataFrame | None) – Optional DataFrame with 2 columns, the first column is the internal code and the value we used in the loading files. The second column is the description of this code that we will be able to see it next to the internla code or query the code by the description

  • set_dict (DataFrame | None) – Optional if we have sets inside the client dictionary.

  • add_missing_codes (bool) – If true will try to “trim” long codes to shorter ones and search for matching

Todo:

change signal to List

Return type:

None

ETL_Infra.etl_process.prepare_final_signals(data, workdir, sigs, batch_size=0, start_write_batch=None, skip_batch_tests=False, editor=None, map_editor=None, override='n', interactive=False)[source]

The main function to generate FinalSignals using batches of DataFrames.

The code will do the following for each batch:

  1. When pid is not numeric a conversion is needed. If the signal is part of “demographic”

it will creating a mappsing file from the string to numeric number. If the signal is not demographic,

it will use the mapping to convert it to numeric. The mapping will be stored inside workdir/FinalSignals/ID2NR

  1. If we have “signals” column and the signal is “unknown” it will try to use “config/map.tsv”

to map the signal to existing signal.

  1. Running The suitable processing unit from CODE_DIR/signal_processings/$SIGNAL_NAME_OR_PROCESSING_UNIT_NAME_OR_TAG.py

The most specific code will be called. There is a logic like class inheritance.

If my signal is Hemoglobin, it is also tagged and “cbc” and “labs”.

It will first look for “Hemoglobin.py”, then “cbc.py” and then “labs.py”.

If the code file is missing/directory doesn’t exist - the code will create the directory and a file with instructions

The signal will be determined by “signal” column if not Null, otherwise it will use the “sigs” parameter.

  1. Testing the result dataframe - first signal format testing, time channel is integer and in valid dates, value is numeric

Categorical values doens’t contains invalid characters, etc. Then deeper tests that can be expended based on the signal labels

Will be executes, like testing the value range.

  1. Sorting, and storing the file in the right place under - WORK_DIR/FinalSignals

Parameters:
  • data (Callable[[int, int], Generator[DataFrame, None, None]] | DataFrame) – A lazy iterator of DataFrame, the only constraint is to have “pid” column. To use this iterator, we need to specify 2 integers - batch size and starting batch position.

  • workdir (str) – The working directory where we stored all the ETL outputs

  • sigs (str) – comma separeted names of logic units to execute on each DataFrame on records without “signal” value

  • batch_size (int) – A parameter to control batch size of the lazy iterator to pass to it

  • start_write_batch (int | None) – If multiple data sources wants to generate the same “signal” we wants to avoid override in FinalSignals. This is our way tp handle this. To give each call od prepare_final_signals a different “batch” number for writing the output

  • skip_batch_tests (bool) – This controls if to skip tests in between batches

  • override (Literal['y', 'n']) – If “y” will redo again and will not take into account current status. The default is “n” - no. To use current state, and to skip completed batches or skip all this processing if all batches are completed.

  • editor (str | None)

  • map_editor (str | None)

  • interactive (bool)

Returns:

True is finished succesfully.

Return type:

bool

ETL_Infra.plot_graph module

plot_graph.get_plotly_js()[source]
plot_graph.plot_graph(obj, save_path, title='Test', mode='markers+lines', javascript_path='W:\\Graph_Infra\\plotly-latest.min.js')[source]

Method to plot graph using plotly

Parameters:
  • obj (Dict[str, DataFrame] | DataFrame) – dictioanry from the name of the series into Dataframe with the data. The first column is x, 2nd is y axis data

  • save_path (str) – path to store the html file

  • title (str) – The graph title

  • mode (str) – controls the graph type: “bar” or options for scatter

  • javascript_path (str) – controls the path to javascript

Return type:

None

ETL_Infra.etl_unit_conversions module

class ETL_Infra.unit_conversions.SuggestionResult(direction, _bias, _factor, _distance, _desc, _target, _grp_size, _current_median, _expected_median)[source]

Bases: object

A class that holds unit suggestion linear transformation

Parameters:
  • direction (bool)

  • _bias (float)

  • _factor (float)

  • _distance (float)

  • _desc (str)

  • _target (str)

  • _grp_size (int)

  • _current_median (float)

  • _expected_median (float)

bias: float
current_median: float
description: str
distance: float
expected_median: float
factor: float
group_size: int
opposite_transformation: bool
target_unit: str
ETL_Infra.unit_conversions.find_best_unit_suggestion(df, allow_op=False, diff_threshold_percentage=0.5, diff_threshold_ratio=3, min_grp_size=500)[source]

Function that recieves dataframe with multiple signals, units and breaks down results by each group of signal,unit to rank of most suitable units linear transformation [bias + factor] for this group. IT will also return the distance from expected median value of target unit

Parameters:
  • df (DataFrame) – DataFrame input with signal, value_0 column

  • allow_op (bool) – If true will allow to test also the opposite transformation of the linear suggestion.

  • diff_threshold_percentage (float) – How much in percentage [0-1] difference from best match continue to suggest other options

  • min_grp_size (int) – Group size that below this value we will ignore

  • diff_threshold_ratio (float)

Return type:

Dict[Tuple[str, str] | str, List[SuggestionResult]]

ETL_Infra.unit_conversions.find_best_unit_suggestion_to_group(df, optional_factors=None, allow_op=False, diff_threshold_percentage=0.5, diff_threshold_ratio=3, signal_value_stats_df=None, unit_suggestion_cfg=None, signal_info=None, group_name=None)[source]

Function that recieve dataframe with raw value and optional tranformation Tuple [bias, factor] and chooses best

Parameters:
  • df (DataFrame) – DataFrame input with signal, value_0 column

  • optional_factors (List[Tuple[float, float, str]] | None) – List of options for linear tranformations - Each tuple is based on [bias, factor]

  • allow_op (bool) – If true will allow to test also the opposite transformation of the linear suggestion.

  • diff_threshold_percentage (float) – How much in percentage [0-1] difference from best match continue to suggest other options

  • signal_value_stats_df (DataFrame | None) – A dataframe with signal and percentile stats - median value. If not given will read again from resources directroy.

  • unit_suggestion_cfg (DataFrame | None) – A dataframe with signal unit suggestion transormations for each signal,unit. If not given will read again from resources directroy.

  • signal_info (Dict[str, SignalInfo] | None) – An object that holds signals information, what is the target unit. If None, will read from resource directory

  • diff_threshold_ratio (float)

  • group_name (tuple[str, str] | None)

Return type:

List[SuggestionResult] | None

ETL_Infra.unit_conversions.try_get_quantile(signal_value_stats_df, prc, signal_name)[source]
Parameters:
  • signal_value_stats_df (DataFrame)

  • prc (float)

  • signal_name (str)

Return type:

float | None