ETL_Infra data retrieval

ETL_Infra.data_fetcher.db_fetcher module

class data_fetcher.DB_Access(connection_string, connect_args=None)[source]

Initialize database object with conncetion string

Parameters:
  • connection_string (str) – the connection string

  • connect_args – Additional connection arguments to sqlalchemy “connect_args” in create_engine

data_fetcher.db_fetcher.db_fetcher(db, sql_query, batch_size, start_batch, batch_mapper)[source]

A helper function to retrieve data from DB in batches.

This will add sorting of the query by “pid” to allow fetching in batches

and continue from where we stoped. It will store last patient id

we reached in each batch under batch_mapper

Parameters:
  • db (DB_Access) – The database object DB_Access

  • sql_query (str) – The sql query to fetch results

  • batch_size (int) – The size of the batch, number of rows to read from each batch

  • start_batch (int) – The starting point, to continue exection from the middle

  • batch_mapper (Dict[int, str]) – A dictionary to map batch starting point to last patient id read in that batch

Returns:

lazy data iterator to retrieve the data

Return type:

Generator[DataFrame, None, None]

ETL_Infra.data_fetcher.files_fetcher module

data_fetcher.files_fetcher.big_files_fetcher(files, batch_size, file_parser, has_header=True, start_batch=0)[source]

A helper function to create data generator from list of big files.

The batching is done in the level of #rows. So “1” means process 1 row, 0 - no batches, read all

It will read several files till it reaches “batch_size” or part of 1

file if the file is bigger than “batch_size”

Parameters:
  • files (List[str]) – list of file paths to process

  • batch_size (int) – the batch size in number of rows to process

  • file_parser (Callable[[str, int, int], Generator[DataFrame, None, None]]) – a function to “read” each file path into dataframe with at least “pid” column The function recieves file path, number of lines to read from (batch_size) the file and how many lines to skip from file (start_from) usefull when we want to continue run from the middle in big file. Example reading with pandas: df_i=pd.read_csv($FILE_PATH, skiprows=start_from_row, chunksize=batch_size)

  • has_header (bool) – A flag to indicate if file contains header

  • start_batch (int) – Staring position of the batch

Returns:

a lazy data generator to fetch the Dataframes

Return type:

Generator[DataFrame, None, None]

data_fetcher.files_fetcher.files_fetcher(files, batch_size, file_parser, start_batch=0)[source]

A helper function to create data generator from list of files.

The batching is done in the level of each file.

So “1” means process each file in separate batch, 0 - no batches

Parameters:
  • files (List[str]) – list of file paths to process

  • batch_size (int) – the batch size in number of files to process

  • file_parser (Callable[[str], DataFrame]) – a function to “read” each file path into dataframe with at least “pid” column

  • start_batch (int) – Staring position of the batch

Returns:

a lazy data generator to fetch the Dataframes

Return type:

Generator[DataFrame, None, None]

data_fetcher.files_fetcher.list_directory_files(base_path, file_search)[source]

A helper function to list all files in directory that matches a certain regex

Parameters:
  • base_path (str) – the directory path

  • file_search (str) – the regex to search in the directory

Returns:

list of files matches the regex

Return type:

List[str]