mlm_insights.core.data_sources package¶
Subpackages¶
- mlm_insights.core.data_sources.exception package
- mlm_insights.core.data_sources.filters package
- Submodules
- mlm_insights.core.data_sources.filters.contains_filter module
- mlm_insights.core.data_sources.filters.date_range_filter module
- mlm_insights.core.data_sources.filters.file_extension_filter module
- mlm_insights.core.data_sources.filters.last_n_days_filter module
- mlm_insights.core.data_sources.filters.oci_object_storage_data_source_filter module
- mlm_insights.core.data_sources.filters.partition_based_date_range_filter module
- mlm_insights.core.data_sources.filters.prefix_filter module
- mlm_insights.core.data_sources.filters.suffix_filter module
- Module contents
- mlm_insights.core.data_sources.utils package
Submodules¶
mlm_insights.core.data_sources.data_source module¶
- class mlm_insights.core.data_sources.data_source.DataSource(type: str, **kwargs: Any)¶
Bases:
ABC
This interface is responsible for encapsulating the file_path, file type. It can be used to implement special functionality that allow for taking parameters and forming a list of file paths to be read by the readers.
For example: if current date needs to be used for reading a specific folder with today’s date, a data source can be used for this purpose.
It is an optional component to implement and use. It can be omitted if the customer explicitly passes the file paths/glob expressions to be read by the readers.
- fetch(filename: str, **kwargs: Any) Any ¶
This method is responsible for fetching the contents of the file for the underlying data source
Parameters¶
- filename:
The canonical file path for which the client has to fetch the raw content
- kwargs:
Extra keyword arguments.
Returns¶
- Any:
The raw content of the file in the accepted format by underlying engine read method by default returns the file path
mlm_insights.core.data_sources.file_url_data_source module¶
- class mlm_insights.core.data_sources.file_url_data_source.FileUrlDataSource(file_path: List[str] | str = '', **kwargs: Any)¶
Bases:
DataSource
This is the default data source used by the Dask Data Reader, in case no explicit Data Source is passed. It is not meant to be used by the users directly.
Returns¶
- List[str]:
List of files present on the file path in the data location
mlm_insights.core.data_sources.local_date_prefix_data_source module¶
- class mlm_insights.core.data_sources.local_date_prefix_data_source.LocalDatePrefixDataSource(base_location: str, file_type: str, offset: int = -1, date_range: Dict[Any, Any] = {}, **kwargs: Any)¶
Bases:
DataSource
This class implements the OOB Data source for retrieving file locations based on a user provided offset or date range. These set of locations are passed to the reader for reading.
User needs to provide only 1 of the 2 - either offset or date range for calculating the folder location. If both are provided, date range is given preference followed by offset value.
Configuration¶
- base_location: str
The prefix to the folder location
- file_type: str
File format for the input data files. eg. csv, jsonl etc.
- date_range: Dict[str, str]
Specify the date range for the dates to be used for folder locations, with ‘start’ and ‘end’ as keys. eg.
{'start': '2023-03-18', 'end': '2023-03-19'}
Either date range or offset needs to be provided by the user
- offset: int, default=-1
No. of days from current time, for calculating the date to pick up data from. Example: for yesterday, offset=1, for 2 days back date, offset=2
Returns¶
- List[str]:
List of file locations
Example code
# For using date_range data = { "file_type": "csv", "date_range": {"start": "2023-03-18", "end": "2023-03-19"} } ds = LocalDatePrefixDataSource(base_location, **data) csv_reader = CSVDaskDataReader(data_source=ds) # Returns 2 data locations ['<base_location>/2023-03-18/*.csv', '<base_location>/2023-03-19/*.csv'] actual_df = csv_reader.read(None) # Reads from the data locations # For using offset data = { "file_type": "csv", "offset": 1 } ds = LocalDatePrefixDataSource(base_location, **data) csv_reader = CSVDaskDataReader(data_source=ds) # Returns 1 data location, given today's date is 2023-03-19: ['<base_location>/2023-03-18/*.csv'] actual_df = csv_reader.read(None) # Reads from the data locations
- mlm_insights.core.data_sources.local_date_prefix_data_source.validate(base_location: str, offset: int, date_range: Dict[Any, Any], file_type: str) None ¶
mlm_insights.core.data_sources.local_file_data_source module¶
- class mlm_insights.core.data_sources.local_file_data_source.LocalFileDataSource(file_path: List[str] | str = '', **kwargs: Any)¶
Bases:
DataSource
This class implements the OOB Data source for retrieving file locations based on a simple file path string or list of strings or a glob string
Configuration¶
- file_path: Union[List[str], str]
A simple file path string / list of string / glob string
Returns¶
- List[str]:
List of files present on the file path in the local system
Example code
ds = LocalFileDataSource(file_path = 'location/csv/*.csv') csv_reader = CSVDaskDataReader(data_source=ds) # Data source will return a list of csv files within the folder location/csv/ actual_df = csv_reader.read(None) # Reads all the files returned by the LocalFileDataSource
mlm_insights.core.data_sources.oci_date_prefix_data_source module¶
- class mlm_insights.core.data_sources.oci_date_prefix_data_source.OCIDatePrefixDataSource(bucket_name: str, namespace: str, file_type: str, object_prefix: str, offset: int = -1, date_range: Dict[Any, Any] = {}, storage_options: Dict[str, Any] = {}, **kwargs: Any)¶
Bases:
DataSource
This class implements the OOB Data source for retrieving file locations based on a user provided offset or date range from OCI Object storage. These set of locations are passed to the reader for reading
User needs to provide only 1 of the 2 - either offset or date range for calculating the folder location. If both are provided, date range is given preference followed by offset value.
Configuration¶
- bucket_name: str
Name of the bucket
- namespace: str
oci cloud namespace of the bucket location
- object_prefix: str
folder path of the data relative to the bucket location, cannot be empty
- file_type: str
File format for the input data files. eg. csv, jsonl etc.
- date_range: Dict[str, str]
Specify the date range for the dates to be used for folder locations, with ‘start’ and ‘end’ as keys. eg.
{'start': '2023-03-18', 'end': '2023-03-19'}
Either date range or offset needs to be provided by the user
- offset: int, default=-1
No. of days from current time, for calculating the date to pick up data from. Example: for yesterday, offset=1, for 2 days back date, offset=2
- storage_options: Dict[str, Any]
storage options are the authentication provided to the underlying ocifs client
Returns¶
- List[str]:
List of OCI Object storage file locations
Example code
# For using date_range data = { "bucket_name": "mlm", "namespace": "mlm", "object_prefix": "mlm", "file_type": "csv", "date_range": {"start": "2023-03-18", "end": "2023-03-19"} } ds = OCIDatePrefixDataSource(**data) csv_reader = CSVDaskDataReader(data_source=ds) # Returns 2 data locations ['oci://mlm@mlm/mlm/2023-03-18/*.csv', 'oci://mlm@mlm/mlm/2023-03-19/*.csv'] actual_df = csv_reader.read(None) # Reads from the data locations # For using offset data = { "bucket_name": "mlm", "namespace": "mlm", "object_prefix": "mlm", "file_type": "csv", "offset": 1 } ds = OCIDatePrefixDataSource(**data) csv_reader = CSVDaskDataReader(data_source=ds) # Returns 1 data location, given today's date is 2023-03-19: ['oci://mlm@mlm/mlm/2023-03-18/*.csv'] actual_df = csv_reader.read(None) # Reads from the data locations
- mlm_insights.core.data_sources.oci_date_prefix_data_source.validate(bucket_name: str, namespace: str, object_prefix: str, offset: int, date_range: Dict[Any, Any], file_type: str) None ¶
mlm_insights.core.data_sources.oci_object_storage_data_source module¶
- class mlm_insights.core.data_sources.oci_object_storage_data_source.OCIObjectStorageDataSource(file_path: List[str] | str = '', storage_options: Dict[str, Any] = {}, **kwargs: Any)¶
Bases:
DataSource
This class implements the OOB Data source for retrieving file locations based on an OCI file path string or list of OCI file path strings or a glob string.
Configuration¶
- file_path: Union[List[str], str]
A simple file path string / list of string / glob string
Returns¶
- List[str]:
List of files present on the file path in the oci object system
Example code
ds = OCIObjectStorageDataSource(file_path = 'oci://location/csv/*.csv') csv_reader = CSVDaskDataReader(data_source=ds) # Data source will return a list of csv files within the OCI Object store location: oci://location/csv/ actual_df = csv_reader.read(None) # Reads all the files returned by the OCIObjectStorageDataSource
- mlm_insights.core.data_sources.oci_object_storage_data_source.validate(file_path: List[str] | str) None ¶
mlm_insights.core.data_sources.oci_object_storage_file_search_data_source module¶
- class mlm_insights.core.data_sources.oci_object_storage_file_search_data_source.ObjectStorageFileSearchDataSource(file_path: List[str] | str = '', storage_options: Dict[str, Any] = {}, filter_arg: List[Any] = [], **kwargs: Any)¶
Bases:
DataSource
This class implements the ObjectStorageFileSearch Data source for retrieving file locations based on an OCI file path string or list of OCI file path strings and filters provided by user .
Configuration¶
- file_path: Union[List[str], str]
A simple file path string / list of string / glob string
- filter_arg: List[Any]
A list of filters arguments
Returns¶
- List[str]:
List of files present on the file path in the oci object system
Example code
data = [ { "contains": "iris_dataset"}, {"date_range": {"start": "2024-01-01", "end": "2024-01-09", "date_format" : "yyyy-mm-dd", "search_type" : "metadata"}}, {"filetype" : "csv"}, {"suffix": "iris.csv"} ] base_locations = 'oci://location/csv/*.csv' ds = ObjectStorageFileSearchDataSource(file_path=base_locations, data_arg=data) csv_reader = CSVNativeDataReader(data_source=ds) # Data source will return a list of csv files within the OCI Object store location: oci://location/csv/ actual_df = csv_reader.read(None) # Reads all the files returned by the OCIObjectStorageDataSource
- get_client(**kwargs: Any) OCIFileSystem ¶
Parameters¶
- kwargs:
Extra keyword arguments
Returns¶
- object_storage_client: ocifs.OCIFileSystem
Object store client