Source code for ETL_Infra.unit_conversions

#!/usr/bin/env python
# coding: utf-8
import os
import sys
import argparse
import datetime as dt
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from .env import *
from .define_signals import SignalInfo, load_signals_map


def read_signals_defs() -> Dict[str, SignalInfo]:
    """
    :meta private:
    """
    codedir = get_codedir()
    return load_signals_map(FULL_SIGNALS_TYPES, codedir)


def get_stats(fullDFi, inFile=None, signals_def=None, samples_per_signal=5):
    """
    :meta private:
    """
    fullDF = fullDFi[fullDFi["unit"].notnull()].reset_index()
    if len(fullDF) == 0:
        return None
    freqSignal = fullDF["signal"].value_counts().reset_index()
    freqUnit = fullDF.value_counts(subset=["signal", "unit"], dropna=False)

    if "index" in freqSignal.columns:
        freqSignal = freqSignal.rename(columns={"signal": "count", "index": "signal"})
    inData = freqSignal.merge(
        freqUnit.to_frame()
        .reset_index()
        .rename(columns={0: "unitCount", "count": "unitCount"}),
        on="signal",
        how="left",
    ).sort_values(by=["count", "signal", "unitCount"], ascending=False)
    inData.loc[inData["unit"].isnull(), "unit"] = ""
    inData["unit"] = inData["unit"].astype(
        str
    )  # in case they are all nulls  and read as floats
    if inFile is None or not (os.path.exists(inFile)):
        inTable = pd.DataFrame(
            columns=[
                "signal",
                "unit",
                "to_signal",
                "to_unit",
                "multiple_by",
                "additive_b0",
                "value",
            ]
        )
    else:
        inTable = pd.read_csv(
            inFile, sep="\t", converters={"signal": str, "unit": str}
        ).rename(columns={"value_0": "value"})
        inTable = inTable[inTable["signal"] != ""].reset_index(
            drop=True
        )  # Remvoe empty lines
        if inTable.empty:
            inTable = pd.DataFrame(
                columns=[
                    "signal",
                    "unit",
                    "to_signal",
                    "to_unit",
                    "multiple_by",
                    "additive_b0",
                    "value",
                ]
            )

    inTable = inTable[inTable["signal"].notnull()].reset_index(drop=True)
    inTable["unit"] = inTable["unit"].astype(str)
    outTable = inData[["signal", "count", "unit", "unitCount"]].merge(
        inTable[
            [
                "signal",
                "unit",
                "to_signal",
                "to_unit",
                "multiple_by",
                "additive_b0",
                "value",
            ]
        ],
        how="outer",
        on=["signal", "unit"],
    )
    outTable = outTable.rename(columns={0: "unitCount"})
    if signals_def is not None:  # Fetch Target unit from known signal
        sig_names = list(map(lambda x: x[1].name, signals_def.items()))
        unit_vec = list(
            map(
                lambda x: x[1].units[0] if (len(x[1].units) > 0) else "",
                signals_def.items(),
            )
        )
        df_unit = pd.DataFrame({"to_signal_f": sig_names, "unit_official": unit_vec})
        df_unit["to_signal_f"] = df_unit["to_signal_f"].astype(str)
        outTable["to_signal_f"] = outTable["to_signal"]
        filter_cond = (outTable["to_signal_f"].isnull()) | (
            outTable["to_signal_f"] == ""
        )
        outTable.loc[filter_cond, "to_signal_f"] = outTable.loc[filter_cond, "signal"]
        outTable.loc[outTable["to_signal_f"].isnull(), "to_signal_f"] = ""
        outTable["to_signal_f"] = outTable["to_signal_f"].astype(str)
        outTable = outTable.merge(df_unit, how="left", on=["to_signal_f"])
        filter_cond = (outTable["unit_official"].notnull()) & (
            outTable["unit_official"] != ""
        )
        outTable.loc[filter_cond, "to_unit"] = outTable.loc[
            filter_cond, "unit_official"
        ]
        outTable = outTable.drop(columns=["unit_official", "to_signal_f"])

    samples = (
        fullDF[["signal", "unit", "value_0"]]
        .groupby(["signal", "unit"])
        .sample(n=samples_per_signal, replace=True)
    )
    outTableSample = (
        outTable.merge(samples, on=["signal", "unit"], how="left")
        .groupby(["signal", "unit"], dropna=False)
        .aggregate(lambda tdf: tdf.tolist())
    )
    # Remove Empty "value 0"
    if "value" in outTableSample.columns:  # Try fix nan
        outTableSample.loc[
            outTableSample["value_0"].apply(lambda x: np.isnan(x).all()), "value_0"
        ] = outTableSample.loc[
            outTableSample["value_0"].apply(lambda x: np.isnan(x).all()), "value"
        ].apply(
            lambda x: eval(x[0])
        )
        outTableSample = outTableSample.drop(columns=["value"])
        outTable = outTable.drop(columns=["value"])
    outTableSample1 = outTable.merge(
        outTableSample["value_0"], on=["signal", "unit"], how="left"
    )

    return outTableSample1


def process_unit_conversion(
    fullDF: pd.DataFrame, outFile: str, signals_def=None, samples_per_signal: int = 5
) -> None:
    """
    :meta private:
    """
    print("Start analyzing signal+units...", flush=True)
    outTable = get_stats(fullDF, outFile, signals_def, samples_per_signal)
    if outTable is not None:
        outTable.to_csv(outFile, index=False, sep="\t")
        print(f">>>>> outTable written to {outFile} <<<<<<", flush=True)


def fix_units(fullDF: pd.DataFrame, inFile: str) -> pd.DataFrame:
    """
    :meta private:
    """
    if not (os.path.exists(inFile)):
        raise NameError(
            "Please call process_unit_conversion, or generate config file\n%s"
            % (inFile)
        )
    print("Start fixing signal+units...", flush=True)
    inTable = pd.read_csv(inFile, sep="\t", converters={"signal": str, "unit": str})
    # Use signal if tp_signal is empty (keep same name) - if you want you can filter yourself later
    empty_map_cond = (inTable["to_signal"].isnull()) | (inTable["to_signal"] == "")
    inTable["mapped"] = 1
    inTable.loc[empty_map_cond, "to_signal"] = inTable.loc[empty_map_cond, "signal"]
    inTable.loc[empty_map_cond, "mapped"] = 0
    df = fullDF
    inTable["unit"] = inTable["unit"].astype(str)
    numConvert = df.merge(
        inTable[
            ["signal", "unit", "to_signal", "multiple_by", "additive_b0", "mapped"]
        ],
        how="left",
        on=["signal", "unit"],
    )

    no_map = numConvert[numConvert["to_signal"].isnull()].reset_index(drop=True)
    if len(no_map) > 0:
        print(f"WARN :: There are {len(no_map)} records without mapping")
        print(no_map.head(10))
        print(
            "Those are the common signal+unit unmapped:\n",
            no_map[["signal", "unit"]].value_counts().reset_index(),
        )
        raise NameError("No mapping found for some of the input signals")

    # Convert
    numConvert.multiple_by = pd.to_numeric(numConvert.multiple_by, errors="coerce")
    numConvert.additive_b0 = pd.to_numeric(numConvert.additive_b0, errors="coerce")
    numConvert.loc[numConvert.multiple_by.isna(), "multiple_by"] = 1  # Default 1
    numConvert.loc[numConvert.additive_b0.isna(), "additive_b0"] = 0  # Default 0
    numConvert["value_00"] = (
        numConvert["value_0"] * numConvert.multiple_by + numConvert.additive_b0
    )
    numConvert = numConvert.rename(
        columns={
            "value_0": "value_0.original",
            "value_00": "value_0",
            "signal": "signal.original",
            "to_signal": "signal",
        }
    ).drop(columns=["multiple_by", "additive_b0", "value_0.original"])
    return numConvert


[docs] class SuggestionResult: """ A class that holds unit suggestion linear transformation """ bias: float factor: float distance: float opposite_transformation: bool description: str target_unit: str group_size: int current_median: float expected_median: float def __init__( self, direction: bool, _bias: float, _factor: float, _distance: float, _desc: str, _target: str, _grp_size: int, _current_median: float, _expected_median: float, ): self.bias = _bias self.factor = _factor self.distance = _distance self.opposite_transformation = direction self.description = _desc self.target_unit = _target self.group_size = _grp_size self.current_median = _current_median self.expected_median = _expected_median def __repr__(self): ss = ( f"Size: {self.group_size}, bias: {self.bias}, factor:{self.factor}, distance_from_target_median: {self.distance}, target_unit:{self.target_unit}, guested_unit: {self.description}" f", current_fixed: {self.current_median}, expected: {self.expected_median}" ) if self.opposite_transformation: return ss + f", reversed_transform!" else: return ss
[docs] def find_best_unit_suggestion( df: pd.DataFrame, allow_op: bool = False, diff_threshold_percentage: float = 0.5, diff_threshold_ratio: float = 3, min_grp_size: int = 500, ) -> Dict[Tuple[str, str] | str, List[SuggestionResult]]: """ Function that recieves dataframe with multiple signals, units and breaks down results by each group of signal,unit to rank of most suitable units linear transformation [bias + factor] for this group. IT will also return the distance from expected median value of target unit :param df: DataFrame input with signal, value_0 column :param allow_op: If true will allow to test also the opposite transformation of the linear suggestion. :param diff_threshold_percentage: How much in percentage [0-1] difference from best match continue to suggest other options :param min_grp_size: Group size that below this value we will ignore """ all_sigs_config = read_signals_defs() current_dir = os.path.dirname(__file__) signal_value_stats_df = pd.read_csv( os.path.join(current_dir, "rep_signals", "signals_prctile.cfg"), sep="\t" ) unit_suggestion_cfg = pd.read_csv( os.path.join(current_dir, "rep_signals", "unit_suggestions.cfg"), sep="\t", names=["signal", "bias", "factor", "description"], ) subset = ["signal"] if "unit" in df.columns: subset.append("unit") res_full = dict() skip_cols = set(["pid", "signal"]) additional_cols = list( filter( lambda x: x not in skip_cols and not (x.startswith("time_")) and not (x.startswith("value_")), df.columns, ) ) first_tm = True grp_name = "GROUP_COLS" for col in additional_cols: if first_tm: df[grp_name] = f"{col}:" + df[col].astype(str) else: df[grp_name] = df[grp_name] + f"|{col}:" + df[col].astype(str) first_tm = False if len(additional_cols) > 0: subset = ["signal", grp_name] for group_key, group_df in df.groupby(by=subset): grp_size = len(group_df) if grp_size < min_grp_size: print(f"Skip {group_key} too small {grp_size}") continue res_full[group_key] = find_best_unit_suggestion_to_group( group_df, None, allow_op, diff_threshold_percentage, diff_threshold_ratio, signal_value_stats_df, unit_suggestion_cfg, all_sigs_config, group_key, ) return res_full
[docs] def try_get_quantile( signal_value_stats_df: pd.DataFrame, prc: float, signal_name: str ) -> float | None: signal_value_stats_df_ = signal_value_stats_df[ signal_value_stats_df["q"] == prc ].reset_index(drop=True) if len(signal_value_stats_df_) == 0: print( f"Warning, no reference for signal {signal_name} to suggest unit transformation", flush=True, ) return None target_median = signal_value_stats_df_["reference"].iloc[0] return target_median
[docs] def find_best_unit_suggestion_to_group( df: pd.DataFrame, optional_factors: List[Tuple[float, float, str]] | None = None, allow_op: bool = False, diff_threshold_percentage: float = 0.5, diff_threshold_ratio: float = 3, signal_value_stats_df: pd.DataFrame | None = None, unit_suggestion_cfg: pd.DataFrame | None = None, signal_info: Dict[str, SignalInfo] | None = None, group_name: tuple[str, str] | None = None, ) -> List[SuggestionResult] | None: """ Function that recieve dataframe with raw value and optional tranformation Tuple [bias, factor] and chooses best :param df: DataFrame input with signal, value_0 column :param optional_factors: List of options for linear tranformations - Each tuple is based on [bias, factor] :param allow_op: If true will allow to test also the opposite transformation of the linear suggestion. :param diff_threshold_percentage: How much in percentage [0-1] difference from best match continue to suggest other options :param signal_value_stats_df: A dataframe with signal and percentile stats - median value. If not given will read again from resources directroy. :param unit_suggestion_cfg: A dataframe with signal unit suggestion transormations for each signal,unit. If not given will read again from resources directroy. :param signal_info: An object that holds signals information, what is the target unit. If None, will read from resource directory """ current_dir = os.path.dirname(__file__) if signal_value_stats_df is None: signal_value_stats_df = pd.read_csv( os.path.join(current_dir, "rep_signals", "signals_prctile.cfg"), sep="\t" ) signal_name = df["signal"].iloc[0] if optional_factors is None or len(optional_factors) == 0: # print(f'Reading suggestion from config file',flush=True) if unit_suggestion_cfg is None: unit_suggestion_cfg = pd.read_csv( os.path.join(current_dir, "rep_signals", "unit_suggestions.cfg"), sep="\t", names=["signal", "bias", "factor", "description"], ) unit_suggestion_cfg = unit_suggestion_cfg[ unit_suggestion_cfg["signal"] == signal_name ].reset_index(drop=True) if len(unit_suggestion_cfg) == 0: # type: ignore print( f"Warning, no options, optional_factors is empty for {signal_name}", flush=True, ) return None optional_factors = list( unit_suggestion_cfg[["bias", "factor", "description"]].values # type: ignore ) grp_size = len(df) prc = 0.5 signal_value_stats_df = ( signal_value_stats_df[signal_value_stats_df["signal"] == signal_name] .reset_index(drop=True)[["q", "value_0"]] .rename(columns={"value_0": "reference"}) ) target_median = try_get_quantile(signal_value_stats_df, prc, signal_name) # type: ignore if target_median is None or target_median == 0: prc = 0.9 target_median = try_get_quantile(signal_value_stats_df, prc, signal_name) # type: ignore if target_median is None or target_median == 0: print(f"WARN: signal {signal_name} has quantile {target_median} - skipping") return None if prc != 0.5: print(f"INFO ::Comparing quantile {prc} for signal {signal_name}") current_median = df["value_0"].quantile(q=prc) if current_median == 0: print( f"Current quantile {prc} for {signal_name} is 0 - skipping unit suggestion" ) return None # Iterate over all options and find best "match", closest. Compare if there are more then one good option: opt_rank = ( [] ) # tuple of index+1 in optional_factors, distance or -(index+1) for opposite for idx, tp in enumerate(optional_factors): idx += 1 bias, factor, desription = tp if bias == 0 and factor == 1: continue # I already have this option fixed_median = factor * current_median + bias opt_distance = abs(target_median - fixed_median) opt_rank.append([idx, opt_distance, fixed_median]) if allow_op: # Let's check opposite transformation: fixed_median = (current_median - bias) / factor opt_distance = abs(target_median - fixed_median) opt_rank.append([-idx, opt_distance, fixed_median]) # Last Option - treat it as [0,1] - means no bias, no factor so keep as is. opt_distance = abs(current_median - target_median) opt_rank.append([None, opt_distance, current_median]) # Sort by distance opt_rank = sorted(opt_rank, key=lambda x: x[1]) # Let's choose best option and see if the next one is far enough behind # Filter diff_threshold_ratio from opt_rank: if target_median > 0: opt_rank = list( filter( lambda x: ( max(x[2] / target_median, target_median / x[2]) <= diff_threshold_ratio if x[2] > 0 else False ), opt_rank, ) ) if len(opt_rank) == 0: raise Exception( f"No suitable unit found for signal {signal_name} - {group_name} - all options does not fit. expected median {target_median}, current median: {current_median}" ) _, best_option_distance, _ = opt_rank[0] till_idx = 0 while best_option_distance == 0 and till_idx + 1 < len(opt_rank): till_idx += 1 _, best_option_distance, _ = opt_rank[till_idx] best_option_distance = min( best_option_distance, 5 ) # Since previous was 0, don't allow more than 5 for idx, opt_tp in enumerate(opt_rank[till_idx:]): _, opt_dist, _ = opt_tp if best_option_distance == 0: break if opt_dist / best_option_distance - 1 > diff_threshold_percentage: break till_idx += 1 opt_rank = opt_rank[:till_idx] # Now return opt_rank in this format: [direction:bool, bias, factor, distance] res = [] target_unit = "" if signal_info is not None and signal_name in signal_info: si = signal_info[signal_name] target_unit = ",".join(si.units) for idx, distance, fixed_median in opt_rank: bias = 0 factor = 1 description = "" if idx is not None: access_idx = abs(idx) - 1 bias = optional_factors[access_idx][0] factor = optional_factors[access_idx][1] description = optional_factors[access_idx][2] if idx < 0: description += " Reversed" opt_res = SuggestionResult( idx is not None and idx < 0, bias, factor, distance, description, target_unit, grp_size, fixed_median, target_median, ) res.append(opt_res) return res
if __name__ == "__main__": # get parameters parser = argparse.ArgumentParser( description="Prepare signals for load. Convert units" ) parser.add_argument( "--raw_pkl", required=True, help="raw pkl file 'id signal date value unit'" ) parser.add_argument("--out_table", required=True, help="updated conversion table") parser.add_argument( "--samples_per_signal", help="number of samples presented from each signal unit combination", default=5, ) args = parser.parse_args() fullDF = pd.read_pickle(args.raw_pkl) fullDF = fullDF[fullDF.value.notna()] process_unit_conversion( fullDF, args.out_table, samples_per_signal=args.samples_per_signal )