Source code for data_fetcher.files_fetcher
import pandas as pd
import os, re
from typing import List, Generator, Callable
# Please use to return function that accepts 2 arguments: [batch_size, start_batch] in order to use in "prepare_final_signals"
# basic data fetcher - the batch will be by batches
# files - list of full paths
# batch_size - batch size in rows/files/your decision. 0 - means read all
# file_parser - function that recieve (file_path) and returns parsed dataframe
[docs]
def files_fetcher(
files: List[str],
batch_size: int,
file_parser: Callable[[str], pd.DataFrame],
start_batch: int = 0,
) -> Generator[pd.DataFrame, None, None]:
"""
A helper function to create data generator from list of files. \n
The batching is done in the level of each file.\n
So "1" means process each file in separate batch, 0 - no batches
:param files: list of file paths to process
:param batch_size: the batch size in number of files to process
:param file_parser: a function to "read" each file path into dataframe with at least "pid" column
:param start_batch: Staring position of the batch
:return: a lazy data generator to fetch the Dataframes
"""
all_data = None
curr_data = []
for full_f in files[start_batch * batch_size :]:
print("Process file %s" % (full_f), flush=True)
data = file_parser(full_f)
curr_data.append(data)
print("Done reading")
if (batch_size > 0) and (len(curr_data) >= batch_size):
data = pd.concat(curr_data, ignore_index=True)
curr_data = []
yield data
if len(curr_data) > 0:
data = pd.concat(curr_data, ignore_index=True)
yield data
[docs]
def big_files_fetcher(
files: List[str],
batch_size: int,
file_parser: Callable[[str, int, int], Generator[pd.DataFrame, None, None]],
has_header: bool = True,
start_batch: int = 0,
) -> Generator[pd.DataFrame, None, None]:
"""
A helper function to create data generator from list of big files. \n
The batching is done in the level of #rows. So "1" means process 1 row, 0 - no batches, read all \n
It will read several files till it reaches "batch_size" or part of 1 \n
file if the file is bigger than "batch_size"
:param files: list of file paths to process
:param batch_size: the batch size in number of rows to process
:param file_parser: a function to "read" each file path into dataframe with at least "pid" column
The function recieves file path, number of lines to read from (batch_size) the file and how many lines to skip from file (start_from)
usefull when we want to continue run from the middle in big file. Example reading with pandas:
df_i=pd.read_csv($FILE_PATH, skiprows=start_from_row, chunksize=batch_size)
:param has_header: A flag to indicate if file contains header
:param start_batch: Staring position of the batch
:return: a lazy data generator to fetch the Dataframes
"""
all_data = None
curr_data = []
curr_size = 0
start_from_row = start_batch * batch_size
for full_f in files:
print("Process file %s" % (full_f), flush=True)
try:
data_it = file_parser(full_f, batch_size, start_from_row)
except pd.errors.EmptyDataError:
with open(full_f, "rbU") as f:
num_lines = sum(1 for _ in f)
if has_header:
num_lines = num_lines - 1 # remove header from count
start_from_row = start_from_row - num_lines
if start_from_row < 0: # not suppose to happen
start_from_row = 0
# Update start_from_row by rows size and continue
continue
for ii, data in enumerate(data_it):
curr_data.append(data)
curr_size += len(data)
print(
f"Done reading batch {ii} in file {full_f}, current buffer size {curr_size}"
)
if (batch_size > 0) and (curr_size >= batch_size):
data = pd.concat(curr_data, ignore_index=True)
curr_data = []
curr_size = 0
yield data
print(f"Done processing file {full_f}")
if len(curr_data) > 0:
data = pd.concat(curr_data, ignore_index=True)
yield data
[docs]
def list_directory_files(base_path: str, file_search: str) -> List[str]:
"""
A helper function to list all files in directory that matches a certain regex
:param base_path: the directory path
:param file_search: the regex to search in the directory
:return: list of files matches the regex
"""
reg_se = re.compile(file_search)
files = list(filter(lambda x: len(reg_se.findall(x)) > 0, os.listdir(base_path)))
files = sorted(files)
files = list(map(lambda x: os.path.join(base_path, x), files))
return files
# TODO: when single file is big - split to batches not needed currently