ETL_Infra data retrieval¶
ETL_Infra.data_fetcher.db_fetcher module¶
- class data_fetcher.DB_Access(connection_string, connect_args=None)[source]¶
Initialize database object with conncetion string
- Parameters:
connection_string (str) – the connection string
connect_args – Additional connection arguments to sqlalchemy “connect_args” in create_engine
- data_fetcher.db_fetcher.db_fetcher(db, sql_query, batch_size, start_batch, batch_mapper)[source]¶
A helper function to retrieve data from DB in batches.
This will add sorting of the query by “pid” to allow fetching in batches
and continue from where we stoped. It will store last patient id
we reached in each batch under batch_mapper
- Parameters:
db (DB_Access) – The database object DB_Access
sql_query (str) – The sql query to fetch results
batch_size (int) – The size of the batch, number of rows to read from each batch
start_batch (int) – The starting point, to continue exection from the middle
batch_mapper (Dict[int, str]) – A dictionary to map batch starting point to last patient id read in that batch
- Returns:
lazy data iterator to retrieve the data
- Return type:
Generator[DataFrame, None, None]
ETL_Infra.data_fetcher.files_fetcher module¶
- data_fetcher.files_fetcher.big_files_fetcher(files, batch_size, file_parser, has_header=True, start_batch=0)[source]¶
A helper function to create data generator from list of big files.
The batching is done in the level of #rows. So “1” means process 1 row, 0 - no batches, read all
It will read several files till it reaches “batch_size” or part of 1
file if the file is bigger than “batch_size”
- Parameters:
files (List[str]) – list of file paths to process
batch_size (int) – the batch size in number of rows to process
file_parser (Callable[[str, int, int], Generator[DataFrame, None, None]]) – a function to “read” each file path into dataframe with at least “pid” column The function recieves file path, number of lines to read from (batch_size) the file and how many lines to skip from file (start_from) usefull when we want to continue run from the middle in big file. Example reading with pandas: df_i=pd.read_csv($FILE_PATH, skiprows=start_from_row, chunksize=batch_size)
has_header (bool) – A flag to indicate if file contains header
start_batch (int) – Staring position of the batch
- Returns:
a lazy data generator to fetch the Dataframes
- Return type:
Generator[DataFrame, None, None]
- data_fetcher.files_fetcher.files_fetcher(files, batch_size, file_parser, start_batch=0)[source]¶
A helper function to create data generator from list of files.
The batching is done in the level of each file.
So “1” means process each file in separate batch, 0 - no batches
- Parameters:
files (List[str]) – list of file paths to process
batch_size (int) – the batch size in number of files to process
file_parser (Callable[[str], DataFrame]) – a function to “read” each file path into dataframe with at least “pid” column
start_batch (int) – Staring position of the batch
- Returns:
a lazy data generator to fetch the Dataframes
- Return type:
Generator[DataFrame, None, None]
- data_fetcher.files_fetcher.list_directory_files(base_path, file_search)[source]¶
A helper function to list all files in directory that matches a certain regex
- Parameters:
base_path (str) – the directory path
file_search (str) – the regex to search in the directory
- Returns:
list of files matches the regex
- Return type:
List[str]