Store
Module containing the core Store definition
DateTimeFormat (Enum)
¶
Datetime format in store document
Source code in maggma/core/store.py
class DateTimeFormat(Enum):
"""Datetime format in store document"""
DateTime = "datetime"
IsoFormat = "isoformat"
Sort (Enum)
¶
Enumeration for sorting order
Source code in maggma/core/store.py
class Sort(Enum):
"""Enumeration for sorting order"""
Ascending = 1
Descending = -1
Store (MSONable)
¶
Abstract class for a data Store Defines the interface for all data going in and out of a Builder
Source code in maggma/core/store.py
class Store(MSONable, metaclass=ABCMeta):
"""
Abstract class for a data Store
Defines the interface for all data going in and out of a Builder
"""
def __init__(
self,
key: str = "task_id",
last_updated_field: str = "last_updated",
last_updated_type: DateTimeFormat = DateTimeFormat("datetime"),
validator: Optional[Validator] = None,
):
"""
Args:
key: main key to index on
last_updated_field: field for date/time stamping the data
last_updated_type: the date/time format for the last_updated_field.
Can be "datetime" or "isoformat"
validator: Validator to validate documents going into the store
"""
self.key = key
self.last_updated_field = last_updated_field
self.last_updated_type = last_updated_type
self._lu_func = (
LU_KEY_ISOFORMAT
if DateTimeFormat(last_updated_type) == DateTimeFormat.IsoFormat
else (identity, identity)
) # type: Tuple[Callable, Callable]
self.validator = validator
self.logger = logging.getLogger(type(self).__name__)
self.logger.addHandler(logging.NullHandler())
@abstractproperty
def _collection(self):
"""
Returns a handle to the pymongo collection object
"""
@abstractproperty
def name(self) -> str:
"""
Return a string representing this data source
"""
@abstractmethod
def connect(self, force_reset: bool = False):
"""
Connect to the source data
Args:
force_reset: whether to reset the connection or not
"""
@abstractmethod
def close(self):
"""
Closes any connections
"""
@abstractmethod
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
@abstractmethod
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
@abstractmethod
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
@abstractmethod
def ensure_index(self, key: str, unique: bool = False) -> bool:
"""
Tries to create an index and return true if it suceeded
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
@abstractmethod
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
@abstractmethod
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
def query_one(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
):
"""
Queries the Store for a single document
Args:
criteria: PyMongo filter for documents to search
properties: properties to return in the document
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
"""
return next(
self.query(criteria=criteria, properties=properties, sort=sort), None
)
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = criteria or {}
results = [
key for key, _ in self.groupby(field, properties=[field], criteria=criteria)
]
results = [get(r, field) for r in results]
return results
@property
def last_updated(self) -> datetime:
"""
Provides the most recent last_updated date time stamp from
the documents in this Store
"""
doc = next(
self.query(
properties=[self.last_updated_field],
sort={self.last_updated_field: -1},
limit=1,
),
None,
)
if doc and not has(doc, self.last_updated_field):
raise StoreError(
f"No field '{self.last_updated_field}' in store document. Please ensure Store.last_updated_field "
"is a datetime field in your store that represents the time of "
"last update to each document."
)
elif not doc or get(doc, self.last_updated_field) is None:
# Handle when collection has docs but `NoneType` last_updated_field.
return datetime.min
else:
return self._lu_func[0](get(doc, self.last_updated_field))
def newer_in(
self, target: "Store", criteria: Optional[Dict] = None, exhaustive: bool = False
) -> List[str]:
"""
Returns the keys of documents that are newer in the target
Store than this Store.
Args:
target: target Store to
criteria: PyMongo filter for documents to search in
exhaustive: triggers an item-by-item check vs. checking
the last_updated of the target Store and using
that to filter out new items in
"""
self.ensure_index(self.key)
self.ensure_index(self.last_updated_field)
if exhaustive:
# Get our current last_updated dates for each key value
props = {self.key: 1, self.last_updated_field: 1, "_id": 0}
dates = {
d[self.key]: self._lu_func[0](
d.get(self.last_updated_field, datetime.max)
)
for d in self.query(properties=props)
}
# Get the last_updated for the store we're comparing with
props = {target.key: 1, target.last_updated_field: 1, "_id": 0}
target_dates = {
d[target.key]: target._lu_func[0](
d.get(target.last_updated_field, datetime.min)
)
for d in target.query(criteria=criteria, properties=props)
}
new_keys = set(target_dates.keys()) - set(dates.keys())
updated_keys = {
key
for key, date in dates.items()
if target_dates.get(key, datetime.min) > date
}
return list(new_keys | updated_keys)
else:
criteria = {
self.last_updated_field: {"$gt": self._lu_func[1](self.last_updated)}
}
return target.distinct(field=self.key, criteria=criteria)
@deprecated(message="Please use Store.newer_in")
def lu_filter(self, targets):
"""Creates a MongoDB filter for new documents.
By "new", we mean documents in this Store that were last updated later
than any document in targets.
Args:
targets (list): A list of Stores
"""
if isinstance(targets, Store):
targets = [targets]
lu_list = [t.last_updated for t in targets]
return {self.last_updated_field: {"$gt": self._lu_func[1](max(lu_list))}}
@deprecated(message="Use Store.newer_in")
def updated_keys(self, target, criteria=None):
"""
Returns keys for docs that are newer in the target store in comparison
with this store when comparing the last updated field (last_updated_field)
Args:
target (Store): store to look for updated documents
criteria (dict): mongo query to limit scope
Returns:
list of keys that have been updated in target store
"""
self.ensure_index(self.key)
self.ensure_index(self.last_updated_field)
return self.newer_in(target, criteria=criteria)
def __ne__(self, other):
return not self == other
def __getstate__(self):
return self.as_dict()
def __setstate__(self, d):
d = {k: v for k, v in d.items() if not k.startswith("@")}
d = MontyDecoder().process_decoded(d)
self.__init__(**d)
def __enter__(self):
self.connect()
return self
def __exit__(self, exception_type, exception_value, traceback):
self.close()
last_updated: datetime
property
readonly
¶
Provides the most recent last_updated date time stamp from the documents in this Store
name: str
property
readonly
¶
Return a string representing this data source
__init__(self, key='task_id', last_updated_field='last_updated', last_updated_type=<DateTimeFormat.DateTime: 'datetime'>, validator=None)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
main key to index on |
'task_id' |
last_updated_field |
str |
field for date/time stamping the data |
'last_updated' |
last_updated_type |
DateTimeFormat |
the date/time format for the last_updated_field. Can be "datetime" or "isoformat" |
<DateTimeFormat.DateTime: 'datetime'> |
validator |
Optional[maggma.core.validator.Validator] |
Validator to validate documents going into the store |
None |
Source code in maggma/core/store.py
def __init__(
self,
key: str = "task_id",
last_updated_field: str = "last_updated",
last_updated_type: DateTimeFormat = DateTimeFormat("datetime"),
validator: Optional[Validator] = None,
):
"""
Args:
key: main key to index on
last_updated_field: field for date/time stamping the data
last_updated_type: the date/time format for the last_updated_field.
Can be "datetime" or "isoformat"
validator: Validator to validate documents going into the store
"""
self.key = key
self.last_updated_field = last_updated_field
self.last_updated_type = last_updated_type
self._lu_func = (
LU_KEY_ISOFORMAT
if DateTimeFormat(last_updated_type) == DateTimeFormat.IsoFormat
else (identity, identity)
) # type: Tuple[Callable, Callable]
self.validator = validator
self.logger = logging.getLogger(type(self).__name__)
self.logger.addHandler(logging.NullHandler())
close(self)
¶
Closes any connections
Source code in maggma/core/store.py
@abstractmethod
def close(self):
"""
Closes any connections
"""
connect(self, force_reset=False)
¶
Connect to the source data
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force_reset |
bool |
whether to reset the connection or not |
False |
Source code in maggma/core/store.py
@abstractmethod
def connect(self, force_reset: bool = False):
"""
Connect to the source data
Args:
force_reset: whether to reset the connection or not
"""
count(self, criteria=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
Source code in maggma/core/store.py
@abstractmethod
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
distinct(self, field, criteria=None, all_exist=False)
¶
Get all distinct values for a field
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field |
str |
the field(s) to get distinct values for |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
Source code in maggma/core/store.py
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = criteria or {}
results = [
key for key, _ in self.groupby(field, properties=[field], criteria=criteria)
]
results = [get(r, field) for r in results]
return results
ensure_index(self, key, unique=False)
¶
Tries to create an index and return true if it suceeded
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
single key to index |
required |
unique |
bool |
Whether or not this index contains only unique keys |
False |
Returns:
Type | Description |
---|---|
bool |
bool indicating if the index exists/was created |
Source code in maggma/core/store.py
@abstractmethod
def ensure_index(self, key: str, unique: bool = False) -> bool:
"""
Tries to create an index and return true if it suceeded
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (dict, list of docs) |
Source code in maggma/core/store.py
@abstractmethod
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
newer_in(self, target, criteria=None, exhaustive=False)
¶
Returns the keys of documents that are newer in the target Store than this Store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target |
Store |
target Store to |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
exhaustive |
bool |
triggers an item-by-item check vs. checking the last_updated of the target Store and using that to filter out new items in |
False |
Source code in maggma/core/store.py
def newer_in(
self, target: "Store", criteria: Optional[Dict] = None, exhaustive: bool = False
) -> List[str]:
"""
Returns the keys of documents that are newer in the target
Store than this Store.
Args:
target: target Store to
criteria: PyMongo filter for documents to search in
exhaustive: triggers an item-by-item check vs. checking
the last_updated of the target Store and using
that to filter out new items in
"""
self.ensure_index(self.key)
self.ensure_index(self.last_updated_field)
if exhaustive:
# Get our current last_updated dates for each key value
props = {self.key: 1, self.last_updated_field: 1, "_id": 0}
dates = {
d[self.key]: self._lu_func[0](
d.get(self.last_updated_field, datetime.max)
)
for d in self.query(properties=props)
}
# Get the last_updated for the store we're comparing with
props = {target.key: 1, target.last_updated_field: 1, "_id": 0}
target_dates = {
d[target.key]: target._lu_func[0](
d.get(target.last_updated_field, datetime.min)
)
for d in target.query(criteria=criteria, properties=props)
}
new_keys = set(target_dates.keys()) - set(dates.keys())
updated_keys = {
key
for key, date in dates.items()
if target_dates.get(key, datetime.min) > date
}
return list(new_keys | updated_keys)
else:
criteria = {
self.last_updated_field: {"$gt": self._lu_func[1](self.last_updated)}
}
return target.distinct(field=self.key, criteria=criteria)
query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Queries the Store for a set of documents
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Source code in maggma/core/store.py
@abstractmethod
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
query_one(self, criteria=None, properties=None, sort=None)
¶
Queries the Store for a single document
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search |
None |
properties |
Union[Dict, List] |
properties to return in the document |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
Source code in maggma/core/store.py
def query_one(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
):
"""
Queries the Store for a single document
Args:
criteria: PyMongo filter for documents to search
properties: properties to return in the document
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
"""
return next(
self.query(criteria=criteria, properties=properties, sort=sort), None
)
remove_docs(self, criteria)
¶
Remove docs matching the query dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
Source code in maggma/core/store.py
@abstractmethod
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
update(self, docs, key=None)
¶
Update documents into the Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
Source code in maggma/core/store.py
@abstractmethod
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
StoreError (Exception)
¶
General Store-related error
Source code in maggma/core/store.py
class StoreError(Exception):
"""General Store-related error"""
def __init__(self, *args, **kwargs):
super().__init__(self, *args, **kwargs)