Stores
Module containing various definitions of Stores. Stores are a default access pattern to data and provide various utilities
JSONStore (MemoryStore)
¶
A Store for access to a single or multiple JSON files
Source code in maggma/stores/mongolike.py
class JSONStore(MemoryStore):
"""
A Store for access to a single or multiple JSON files
"""
def __init__(
self,
paths: Union[str, List[str]],
read_only: bool = True,
serialization_option: Optional[int] = None,
serialization_default: Any = None,
**kwargs,
):
"""
Args:
paths: paths for json files to turn into a Store
read_only: whether this JSONStore is read only. When read_only=True,
the JSONStore can still apply MongoDB-like writable operations
(e.g. an update) because it behaves like a MemoryStore,
but it will not write those changes to the file. On the other hand,
if read_only=False (i.e., it is writeable), the JSON file
will be automatically updated every time a write-like operation is
performed.
Note that when read_only=False, JSONStore only supports a single JSON
file. If the file does not exist, it will be automatically created
when the JSONStore is initialized.
serialization_option:
option that will be passed to the orjson.dump when saving to the json the file.
serialization_default:
default that will be passed to the orjson.dump when saving to the json the file.
"""
paths = paths if isinstance(paths, (list, tuple)) else [paths]
self.paths = paths
# file_writable overrides read_only for compatibility reasons
if "file_writable" in kwargs:
file_writable = kwargs.pop("file_writable")
warnings.warn(
"file_writable is deprecated; use read only instead.",
DeprecationWarning,
)
self.read_only = not file_writable
if self.read_only != read_only:
warnings.warn(
f"Received conflicting keyword arguments file_writable={file_writable}"
f" and read_only={read_only}. Setting read_only={file_writable}.",
UserWarning,
)
else:
self.read_only = read_only
self.kwargs = kwargs
if not self.read_only and len(paths) > 1:
raise RuntimeError(
"Cannot instantiate file-writable JSONStore with multiple JSON files."
)
# create the .json file if it does not exist
if not self.read_only and not Path(self.paths[0]).exists():
with zopen(self.paths[0], "w") as f:
data: List[dict] = []
bytesdata = orjson.dumps(data)
f.write(bytesdata.decode("utf-8"))
self.default_sort = None
self.serialization_option = serialization_option
self.serialization_default = serialization_default
super().__init__(**kwargs)
def connect(self, force_reset=False):
"""
Loads the files into the collection in memory
"""
super().connect(force_reset=force_reset)
for path in self.paths:
objects = self.read_json_file(path)
try:
self.update(objects)
except KeyError:
raise KeyError(
f"""
Key field '{self.key}' not found in {path.name}. This
could mean that this JSONStore was initially created with a different key field.
The keys found in the .json file are {list(objects[0].keys())}. Try
re-initializing your JSONStore using one of these as the key arguments.
"""
)
def read_json_file(self, path) -> List:
"""
Helper method to read the contents of a JSON file and generate
a list of docs.
Args:
path: Path to the JSON file to be read
"""
with zopen(path) as f:
data = f.read()
data = data.decode() if isinstance(data, bytes) else data
objects = orjson.loads(data)
objects = [objects] if not isinstance(objects, list) else objects
# datetime objects deserialize to str. Try to convert the last_updated
# field back to datetime.
# # TODO - there may still be problems caused if a JSONStore is init'ed from
# documents that don't contain a last_updated field
# See Store.last_updated in store.py.
for obj in objects:
if obj.get(self.last_updated_field):
obj[self.last_updated_field] = to_dt(obj[self.last_updated_field])
return objects
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store.
For a file-writable JSONStore, the json file is updated.
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
super().update(docs=docs, key=key)
if not self.read_only:
self.update_json_file()
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary.
For a file-writable JSONStore, the json file is updated.
Args:
criteria: query dictionary to match
"""
super().remove_docs(criteria=criteria)
if not self.read_only:
self.update_json_file()
def update_json_file(self):
"""
Updates the json file when a write-like operation is performed.
"""
with zopen(self.paths[0], "w") as f:
data = [d for d in self.query()]
for d in data:
d.pop("_id")
bytesdata = orjson.dumps(
data,
option=self.serialization_option,
default=self.serialization_default,
)
f.write(bytesdata.decode("utf-8"))
def __hash__(self):
return hash((*self.paths, self.last_updated_field))
def __eq__(self, other: object) -> bool:
"""
Check equality for JSONStore
Args:
other: other JSONStore to compare with
"""
if not isinstance(other, JSONStore):
return False
fields = ["paths", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__eq__(self, other)
special
¶
Check equality for JSONStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
other JSONStore to compare with |
required |
Source code in maggma/stores/mongolike.py
def __eq__(self, other: object) -> bool:
"""
Check equality for JSONStore
Args:
other: other JSONStore to compare with
"""
if not isinstance(other, JSONStore):
return False
fields = ["paths", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, paths, read_only=True, serialization_option=None, serialization_default=None, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
paths |
Union[str, List[str]] |
paths for json files to turn into a Store |
required |
read_only |
bool |
whether this JSONStore is read only. When read_only=True, the JSONStore can still apply MongoDB-like writable operations (e.g. an update) because it behaves like a MemoryStore, but it will not write those changes to the file. On the other hand, if read_only=False (i.e., it is writeable), the JSON file will be automatically updated every time a write-like operation is performed.
|
True |
serialization_option |
Optional[int] |
option that will be passed to the orjson.dump when saving to the json the file. |
None |
serialization_default |
Any |
default that will be passed to the orjson.dump when saving to the json the file. |
None |
Source code in maggma/stores/mongolike.py
def __init__(
self,
paths: Union[str, List[str]],
read_only: bool = True,
serialization_option: Optional[int] = None,
serialization_default: Any = None,
**kwargs,
):
"""
Args:
paths: paths for json files to turn into a Store
read_only: whether this JSONStore is read only. When read_only=True,
the JSONStore can still apply MongoDB-like writable operations
(e.g. an update) because it behaves like a MemoryStore,
but it will not write those changes to the file. On the other hand,
if read_only=False (i.e., it is writeable), the JSON file
will be automatically updated every time a write-like operation is
performed.
Note that when read_only=False, JSONStore only supports a single JSON
file. If the file does not exist, it will be automatically created
when the JSONStore is initialized.
serialization_option:
option that will be passed to the orjson.dump when saving to the json the file.
serialization_default:
default that will be passed to the orjson.dump when saving to the json the file.
"""
paths = paths if isinstance(paths, (list, tuple)) else [paths]
self.paths = paths
# file_writable overrides read_only for compatibility reasons
if "file_writable" in kwargs:
file_writable = kwargs.pop("file_writable")
warnings.warn(
"file_writable is deprecated; use read only instead.",
DeprecationWarning,
)
self.read_only = not file_writable
if self.read_only != read_only:
warnings.warn(
f"Received conflicting keyword arguments file_writable={file_writable}"
f" and read_only={read_only}. Setting read_only={file_writable}.",
UserWarning,
)
else:
self.read_only = read_only
self.kwargs = kwargs
if not self.read_only and len(paths) > 1:
raise RuntimeError(
"Cannot instantiate file-writable JSONStore with multiple JSON files."
)
# create the .json file if it does not exist
if not self.read_only and not Path(self.paths[0]).exists():
with zopen(self.paths[0], "w") as f:
data: List[dict] = []
bytesdata = orjson.dumps(data)
f.write(bytesdata.decode("utf-8"))
self.default_sort = None
self.serialization_option = serialization_option
self.serialization_default = serialization_default
super().__init__(**kwargs)
connect(self, force_reset=False)
¶
Loads the files into the collection in memory
Source code in maggma/stores/mongolike.py
def connect(self, force_reset=False):
"""
Loads the files into the collection in memory
"""
super().connect(force_reset=force_reset)
for path in self.paths:
objects = self.read_json_file(path)
try:
self.update(objects)
except KeyError:
raise KeyError(
f"""
Key field '{self.key}' not found in {path.name}. This
could mean that this JSONStore was initially created with a different key field.
The keys found in the .json file are {list(objects[0].keys())}. Try
re-initializing your JSONStore using one of these as the key arguments.
"""
)
read_json_file(self, path)
¶
Helper method to read the contents of a JSON file and generate a list of docs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path to the JSON file to be read |
required |
Source code in maggma/stores/mongolike.py
def read_json_file(self, path) -> List:
"""
Helper method to read the contents of a JSON file and generate
a list of docs.
Args:
path: Path to the JSON file to be read
"""
with zopen(path) as f:
data = f.read()
data = data.decode() if isinstance(data, bytes) else data
objects = orjson.loads(data)
objects = [objects] if not isinstance(objects, list) else objects
# datetime objects deserialize to str. Try to convert the last_updated
# field back to datetime.
# # TODO - there may still be problems caused if a JSONStore is init'ed from
# documents that don't contain a last_updated field
# See Store.last_updated in store.py.
for obj in objects:
if obj.get(self.last_updated_field):
obj[self.last_updated_field] = to_dt(obj[self.last_updated_field])
return objects
remove_docs(self, criteria)
¶
Remove docs matching the query dictionary.
For a file-writable JSONStore, the json file is updated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
Source code in maggma/stores/mongolike.py
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary.
For a file-writable JSONStore, the json file is updated.
Args:
criteria: query dictionary to match
"""
super().remove_docs(criteria=criteria)
if not self.read_only:
self.update_json_file()
update(self, docs, key=None)
¶
Update documents into the Store.
For a file-writable JSONStore, the json file is updated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
Source code in maggma/stores/mongolike.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store.
For a file-writable JSONStore, the json file is updated.
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
super().update(docs=docs, key=key)
if not self.read_only:
self.update_json_file()
update_json_file(self)
¶
Updates the json file when a write-like operation is performed.
Source code in maggma/stores/mongolike.py
def update_json_file(self):
"""
Updates the json file when a write-like operation is performed.
"""
with zopen(self.paths[0], "w") as f:
data = [d for d in self.query()]
for d in data:
d.pop("_id")
bytesdata = orjson.dumps(
data,
option=self.serialization_option,
default=self.serialization_default,
)
f.write(bytesdata.decode("utf-8"))
MemoryStore (MongoStore)
¶
An in-memory Store that functions similarly to a MongoStore
Source code in maggma/stores/mongolike.py
class MemoryStore(MongoStore):
"""
An in-memory Store that functions similarly
to a MongoStore
"""
def __init__(self, collection_name: str = "memory_db", **kwargs):
"""
Initializes the Memory Store
Args:
collection_name: name for the collection in memory
"""
self.collection_name = collection_name
self.default_sort = None
self._coll = None
self.kwargs = kwargs
super(MongoStore, self).__init__(**kwargs) # noqa
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self._coll is None or force_reset:
self._coll = mongomock.MongoClient().db[self.name] # type: ignore
def close(self):
"""Close up all collections"""
self._coll.database.client.close()
@property
def name(self):
"""Name for the store"""
return f"mem://{self.collection_name}"
def __hash__(self):
"""Hash for the store"""
return hash((self.name, self.last_updated_field))
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (key, list of elements)
"""
keys = keys if isinstance(keys, list) else [keys]
if properties is None:
properties = []
if isinstance(properties, dict):
properties = list(properties.keys())
data = [
doc
for doc in self.query(properties=keys + properties, criteria=criteria)
if all(has(doc, k) for k in keys)
]
def grouping_keys(doc):
return tuple(get(doc, k) for k in keys)
for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
doc = {} # type: ignore
for k, v in zip(keys, vals):
set_(doc, k, v)
yield doc, list(group)
def __eq__(self, other: object) -> bool:
"""
Check equality for MemoryStore
other: other MemoryStore to compare with
"""
if not isinstance(other, MemoryStore):
return False
fields = ["collection_name", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
name
property
readonly
¶
Name for the store
__eq__(self, other)
special
¶
Check equality for MemoryStore other: other MemoryStore to compare with
Source code in maggma/stores/mongolike.py
def __eq__(self, other: object) -> bool:
"""
Check equality for MemoryStore
other: other MemoryStore to compare with
"""
if not isinstance(other, MemoryStore):
return False
fields = ["collection_name", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__hash__(self)
special
¶
Hash for the store
Source code in maggma/stores/mongolike.py
def __hash__(self):
"""Hash for the store"""
return hash((self.name, self.last_updated_field))
__init__(self, collection_name='memory_db', **kwargs)
special
¶
Initializes the Memory Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_name |
str |
name for the collection in memory |
'memory_db' |
Source code in maggma/stores/mongolike.py
def __init__(self, collection_name: str = "memory_db", **kwargs):
"""
Initializes the Memory Store
Args:
collection_name: name for the collection in memory
"""
self.collection_name = collection_name
self.default_sort = None
self._coll = None
self.kwargs = kwargs
super(MongoStore, self).__init__(**kwargs) # noqa
close(self)
¶
Close up all collections
Source code in maggma/stores/mongolike.py
def close(self):
"""Close up all collections"""
self._coll.database.client.close()
connect(self, force_reset=False)
¶
Connect to the source data
Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self._coll is None or force_reset:
self._coll = mongomock.MongoClient().db[self.name] # type: ignore
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (key, list of elements) |
Source code in maggma/stores/mongolike.py
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (key, list of elements)
"""
keys = keys if isinstance(keys, list) else [keys]
if properties is None:
properties = []
if isinstance(properties, dict):
properties = list(properties.keys())
data = [
doc
for doc in self.query(properties=keys + properties, criteria=criteria)
if all(has(doc, k) for k in keys)
]
def grouping_keys(doc):
return tuple(get(doc, k) for k in keys)
for vals, group in groupby(sorted(data, key=grouping_keys), key=grouping_keys):
doc = {} # type: ignore
for k, v in zip(keys, vals):
set_(doc, k, v)
yield doc, list(group)
MongoStore (Store)
¶
A Store that connects to a Mongo collection
Source code in maggma/stores/mongolike.py
class MongoStore(Store):
"""
A Store that connects to a Mongo collection
"""
def __init__(
self,
database: str,
collection_name: str,
host: str = "localhost",
port: int = 27017,
username: str = "",
password: str = "",
ssh_tunnel: Optional[SSHTunnel] = None,
safe_update: bool = False,
auth_source: Optional[str] = None,
mongoclient_kwargs: Optional[Dict] = None,
default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
**kwargs,
):
"""
Args:
database: The database name
collection_name: The collection name
host: Hostname for the database
port: TCP port to connect to
username: Username for the collection
password: Password to connect with
safe_update: fail gracefully on DocumentTooLarge errors on update
auth_source: The database to authenticate on. Defaults to the database name.
default_sort: Default sort field and direction to use when querying. Can be used to
ensure determinacy in query results.
"""
self.database = database
self.collection_name = collection_name
self.host = host
self.port = port
self.username = username
self.password = password
self.ssh_tunnel = ssh_tunnel
self.safe_update = safe_update
self.default_sort = default_sort
self._coll = None # type: ignore
self.kwargs = kwargs
if auth_source is None:
auth_source = self.database
self.auth_source = auth_source
self.mongoclient_kwargs = mongoclient_kwargs or {}
super().__init__(**kwargs)
@property
def name(self) -> str:
"""
Return a string representing this data source
"""
return f"mongo://{self.host}/{self.database}/{self.collection_name}"
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self._coll is None or force_reset:
if self.ssh_tunnel is None:
host = self.host
port = self.port
else:
self.ssh_tunnel.start()
host, port = self.ssh_tunnel.local_address
conn: MongoClient = (
MongoClient(
host=host,
port=port,
username=self.username,
password=self.password,
authSource=self.auth_source,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(host, port, **self.mongoclient_kwargs)
)
db = conn[self.database]
self._coll = db[self.collection_name] # type: ignore
def __hash__(self) -> int:
"""Hash for MongoStore"""
return hash((self.database, self.collection_name, self.last_updated_field))
@classmethod
def from_db_file(cls, filename: str, **kwargs):
"""
Convenience method to construct MongoStore from db_file
from old QueryEngine format
"""
kwargs = loadfn(filename)
if "collection" in kwargs:
kwargs["collection_name"] = kwargs.pop("collection")
# Get rid of aliases from traditional query engine db docs
kwargs.pop("aliases", None)
return cls(**kwargs)
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
"""
Convenience method to construct MongoStore from a launchpad file
Note: A launchpad file is a special formatted yaml file used in fireworks
Returns:
"""
with open(lp_file, "r") as f:
lp_creds = yaml.safe_load(f.read())
db_creds = lp_creds.copy()
db_creds["database"] = db_creds["name"]
for key in list(db_creds.keys()):
if key not in ["database", "host", "port", "username", "password"]:
db_creds.pop(key)
db_creds["collection_name"] = collection_name
return cls(**db_creds, **kwargs)
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = criteria or {}
try:
distinct_vals = self._collection.distinct(field, criteria)
except (OperationFailure, DocumentTooLarge):
distinct_vals = [
d["_id"]
for d in self._collection.aggregate(
[{"$match": criteria}, {"$group": {"_id": f"${field}"}}]
)
]
if all(isinstance(d, list) for d in filter(None, distinct_vals)): # type: ignore
distinct_vals = list(chain.from_iterable(filter(None, distinct_vals)))
return distinct_vals if distinct_vals is not None else []
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (key, list of docs)
"""
pipeline = []
if isinstance(keys, str):
keys = [keys]
if properties is None:
properties = []
if isinstance(properties, dict):
properties = list(properties.keys())
if criteria is not None:
pipeline.append({"$match": criteria})
if len(properties) > 0:
pipeline.append({"$project": {p: 1 for p in properties + keys}})
alpha = "abcdefghijklmnopqrstuvwxyz"
group_id = {letter: f"${key}" for letter, key in zip(alpha, keys)}
pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})
for d in self._collection.aggregate(pipeline, allowDiskUse=True):
id_doc = {} # type: ignore
for letter, key in group_id.items():
if has(d["_id"], letter):
set_(id_doc, key[1:], d["_id"][letter])
yield (id_doc, d["docs"])
@classmethod
def from_collection(cls, collection):
"""
Generates a MongoStore from a pymongo collection object
This is not a fully safe operation as it gives dummy information to the MongoStore
As a result, this will not serialize and can not reset its connection
Args:
collection: the PyMongo collection to create a MongoStore around
"""
# TODO: How do we make this safer?
coll_name = collection.name
db_name = collection.database.name
store = cls(db_name, coll_name)
store._coll = collection
return store
@property
def _collection(self):
"""Property referring to underlying pymongo collection"""
if self._coll is None:
raise StoreError(
"Must connect Mongo-like store before attempting to use it"
)
return self._coll
def count(
self,
criteria: Optional[Dict] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
"""
criteria = criteria if criteria else {}
hint_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in hint.items()
]
if hint
else None
)
if hint_list is not None: # pragma: no cover
return self._collection.count_documents(filter=criteria, hint=hint_list)
return (
self._collection.count_documents(filter=criteria)
if criteria
else self._collection.estimated_document_count()
)
def query( # type: ignore
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
**kwargs,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
mongoclient_kwargs: Dict of extra kwargs to pass to pymongo find.
"""
if isinstance(properties, list):
properties = {p: 1 for p in properties}
default_sort_formatted = None
if self.default_sort is not None:
default_sort_formatted = [
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in self.default_sort.items()
]
sort_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in sort.items()
]
if sort
else default_sort_formatted
)
hint_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in hint.items()
]
if hint
else None
)
for d in self._collection.find(
filter=criteria,
projection=properties,
skip=skip,
limit=limit,
sort=sort_list,
hint=hint_list,
**kwargs,
):
yield d
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
"""
Tries to create an index and return true if it succeeded
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
if confirm_field_index(self._collection, key):
return True
else:
try:
self._collection.create_index(key, unique=unique, background=True)
return True
except Exception:
return False
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
requests = []
if not isinstance(docs, list):
docs = [docs]
for d in docs:
d = jsanitize(d, allow_bson=True)
# document-level validation is optional
validates = True
if self.validator:
validates = self.validator.is_valid(d)
if not validates:
if self.validator.strict:
raise ValueError(self.validator.validation_errors(d))
else:
self.logger.error(self.validator.validation_errors(d))
if validates:
key = key or self.key
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
else:
search_doc = {key: d[key]}
requests.append(ReplaceOne(search_doc, d, upsert=True))
if len(requests) > 0:
try:
self._collection.bulk_write(requests, ordered=False)
except (OperationFailure, DocumentTooLarge) as e:
if self.safe_update:
for req in requests:
req._filter
try:
self._collection.bulk_write([req], ordered=False)
except (OperationFailure, DocumentTooLarge):
self.logger.error(
f"Could not upload document for {req._filter} as it was too large for Mongo"
)
else:
raise e
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
self._collection.delete_many(filter=criteria)
def close(self):
"""Close up all collections"""
self._collection.database.client.close()
self._coll = None
if self.ssh_tunnel is not None:
self.ssh_tunnel.stop()
def __eq__(self, other: object) -> bool:
"""
Check equality for MongoStore
other: other mongostore to compare with
"""
if not isinstance(other, MongoStore):
return False
fields = ["database", "collection_name", "host", "port", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
name: str
property
readonly
¶
Return a string representing this data source
__eq__(self, other)
special
¶
Check equality for MongoStore other: other mongostore to compare with
Source code in maggma/stores/mongolike.py
def __eq__(self, other: object) -> bool:
"""
Check equality for MongoStore
other: other mongostore to compare with
"""
if not isinstance(other, MongoStore):
return False
fields = ["database", "collection_name", "host", "port", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__hash__(self)
special
¶
Hash for MongoStore
Source code in maggma/stores/mongolike.py
def __hash__(self) -> int:
"""Hash for MongoStore"""
return hash((self.database, self.collection_name, self.last_updated_field))
__init__(self, database, collection_name, host='localhost', port=27017, username='', password='', ssh_tunnel=None, safe_update=False, auth_source=None, mongoclient_kwargs=None, default_sort=None, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
database |
str |
The database name |
required |
collection_name |
str |
The collection name |
required |
host |
str |
Hostname for the database |
'localhost' |
port |
int |
TCP port to connect to |
27017 |
username |
str |
Username for the collection |
'' |
password |
str |
Password to connect with |
'' |
safe_update |
bool |
fail gracefully on DocumentTooLarge errors on update |
False |
auth_source |
Optional[str] |
The database to authenticate on. Defaults to the database name. |
None |
default_sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Default sort field and direction to use when querying. Can be used to ensure determinacy in query results. |
None |
Source code in maggma/stores/mongolike.py
def __init__(
self,
database: str,
collection_name: str,
host: str = "localhost",
port: int = 27017,
username: str = "",
password: str = "",
ssh_tunnel: Optional[SSHTunnel] = None,
safe_update: bool = False,
auth_source: Optional[str] = None,
mongoclient_kwargs: Optional[Dict] = None,
default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
**kwargs,
):
"""
Args:
database: The database name
collection_name: The collection name
host: Hostname for the database
port: TCP port to connect to
username: Username for the collection
password: Password to connect with
safe_update: fail gracefully on DocumentTooLarge errors on update
auth_source: The database to authenticate on. Defaults to the database name.
default_sort: Default sort field and direction to use when querying. Can be used to
ensure determinacy in query results.
"""
self.database = database
self.collection_name = collection_name
self.host = host
self.port = port
self.username = username
self.password = password
self.ssh_tunnel = ssh_tunnel
self.safe_update = safe_update
self.default_sort = default_sort
self._coll = None # type: ignore
self.kwargs = kwargs
if auth_source is None:
auth_source = self.database
self.auth_source = auth_source
self.mongoclient_kwargs = mongoclient_kwargs or {}
super().__init__(**kwargs)
close(self)
¶
Close up all collections
Source code in maggma/stores/mongolike.py
def close(self):
"""Close up all collections"""
self._collection.database.client.close()
self._coll = None
if self.ssh_tunnel is not None:
self.ssh_tunnel.stop()
connect(self, force_reset=False)
¶
Connect to the source data
Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self._coll is None or force_reset:
if self.ssh_tunnel is None:
host = self.host
port = self.port
else:
self.ssh_tunnel.start()
host, port = self.ssh_tunnel.local_address
conn: MongoClient = (
MongoClient(
host=host,
port=port,
username=self.username,
password=self.password,
authSource=self.auth_source,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(host, port, **self.mongoclient_kwargs)
)
db = conn[self.database]
self._coll = db[self.collection_name] # type: ignore
count(self, criteria=None, hint=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
hint |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
Source code in maggma/stores/mongolike.py
def count(
self,
criteria: Optional[Dict] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
"""
criteria = criteria if criteria else {}
hint_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in hint.items()
]
if hint
else None
)
if hint_list is not None: # pragma: no cover
return self._collection.count_documents(filter=criteria, hint=hint_list)
return (
self._collection.count_documents(filter=criteria)
if criteria
else self._collection.estimated_document_count()
)
distinct(self, field, criteria=None, all_exist=False)
¶
Get all distinct values for a field
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field |
str |
the field(s) to get distinct values for |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
Source code in maggma/stores/mongolike.py
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = criteria or {}
try:
distinct_vals = self._collection.distinct(field, criteria)
except (OperationFailure, DocumentTooLarge):
distinct_vals = [
d["_id"]
for d in self._collection.aggregate(
[{"$match": criteria}, {"$group": {"_id": f"${field}"}}]
)
]
if all(isinstance(d, list) for d in filter(None, distinct_vals)): # type: ignore
distinct_vals = list(chain.from_iterable(filter(None, distinct_vals)))
return distinct_vals if distinct_vals is not None else []
ensure_index(self, key, unique=False)
¶
Tries to create an index and return true if it succeeded
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
single key to index |
required |
unique |
Optional[bool] |
Whether or not this index contains only unique keys |
False |
Returns:
Type | Description |
---|---|
bool |
bool indicating if the index exists/was created |
Source code in maggma/stores/mongolike.py
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
"""
Tries to create an index and return true if it succeeded
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
if confirm_field_index(self._collection, key):
return True
else:
try:
self._collection.create_index(key, unique=unique, background=True)
return True
except Exception:
return False
from_collection(collection)
classmethod
¶
Generates a MongoStore from a pymongo collection object This is not a fully safe operation as it gives dummy information to the MongoStore As a result, this will not serialize and can not reset its connection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection |
the PyMongo collection to create a MongoStore around |
required |
Source code in maggma/stores/mongolike.py
@classmethod
def from_collection(cls, collection):
"""
Generates a MongoStore from a pymongo collection object
This is not a fully safe operation as it gives dummy information to the MongoStore
As a result, this will not serialize and can not reset its connection
Args:
collection: the PyMongo collection to create a MongoStore around
"""
# TODO: How do we make this safer?
coll_name = collection.name
db_name = collection.database.name
store = cls(db_name, coll_name)
store._coll = collection
return store
from_db_file(filename, **kwargs)
classmethod
¶
Convenience method to construct MongoStore from db_file from old QueryEngine format
Source code in maggma/stores/mongolike.py
@classmethod
def from_db_file(cls, filename: str, **kwargs):
"""
Convenience method to construct MongoStore from db_file
from old QueryEngine format
"""
kwargs = loadfn(filename)
if "collection" in kwargs:
kwargs["collection_name"] = kwargs.pop("collection")
# Get rid of aliases from traditional query engine db docs
kwargs.pop("aliases", None)
return cls(**kwargs)
from_launchpad_file(lp_file, collection_name, **kwargs)
classmethod
¶
Convenience method to construct MongoStore from a launchpad file
Note: A launchpad file is a special formatted yaml file used in fireworks
Source code in maggma/stores/mongolike.py
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
"""
Convenience method to construct MongoStore from a launchpad file
Note: A launchpad file is a special formatted yaml file used in fireworks
Returns:
"""
with open(lp_file, "r") as f:
lp_creds = yaml.safe_load(f.read())
db_creds = lp_creds.copy()
db_creds["database"] = db_creds["name"]
for key in list(db_creds.keys()):
if key not in ["database", "host", "port", "username", "password"]:
db_creds.pop(key)
db_creds["collection_name"] = collection_name
return cls(**db_creds, **kwargs)
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (key, list of docs) |
Source code in maggma/stores/mongolike.py
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (key, list of docs)
"""
pipeline = []
if isinstance(keys, str):
keys = [keys]
if properties is None:
properties = []
if isinstance(properties, dict):
properties = list(properties.keys())
if criteria is not None:
pipeline.append({"$match": criteria})
if len(properties) > 0:
pipeline.append({"$project": {p: 1 for p in properties + keys}})
alpha = "abcdefghijklmnopqrstuvwxyz"
group_id = {letter: f"${key}" for letter, key in zip(alpha, keys)}
pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})
for d in self._collection.aggregate(pipeline, allowDiskUse=True):
id_doc = {} # type: ignore
for letter, key in group_id.items():
if has(d["_id"], letter):
set_(id_doc, key[1:], d["_id"][letter])
yield (id_doc, d["docs"])
query(self, criteria=None, properties=None, sort=None, hint=None, skip=0, limit=0, **kwargs)
¶
Queries the Store for a set of documents
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
hint |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
mongoclient_kwargs |
Dict of extra kwargs to pass to pymongo find. |
required |
Source code in maggma/stores/mongolike.py
def query( # type: ignore
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
**kwargs,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
mongoclient_kwargs: Dict of extra kwargs to pass to pymongo find.
"""
if isinstance(properties, list):
properties = {p: 1 for p in properties}
default_sort_formatted = None
if self.default_sort is not None:
default_sort_formatted = [
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in self.default_sort.items()
]
sort_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in sort.items()
]
if sort
else default_sort_formatted
)
hint_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in hint.items()
]
if hint
else None
)
for d in self._collection.find(
filter=criteria,
projection=properties,
skip=skip,
limit=limit,
sort=sort_list,
hint=hint_list,
**kwargs,
):
yield d
remove_docs(self, criteria)
¶
Remove docs matching the query dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
Source code in maggma/stores/mongolike.py
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
self._collection.delete_many(filter=criteria)
update(self, docs, key=None)
¶
Update documents into the Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
Source code in maggma/stores/mongolike.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
requests = []
if not isinstance(docs, list):
docs = [docs]
for d in docs:
d = jsanitize(d, allow_bson=True)
# document-level validation is optional
validates = True
if self.validator:
validates = self.validator.is_valid(d)
if not validates:
if self.validator.strict:
raise ValueError(self.validator.validation_errors(d))
else:
self.logger.error(self.validator.validation_errors(d))
if validates:
key = key or self.key
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
else:
search_doc = {key: d[key]}
requests.append(ReplaceOne(search_doc, d, upsert=True))
if len(requests) > 0:
try:
self._collection.bulk_write(requests, ordered=False)
except (OperationFailure, DocumentTooLarge) as e:
if self.safe_update:
for req in requests:
req._filter
try:
self._collection.bulk_write([req], ordered=False)
except (OperationFailure, DocumentTooLarge):
self.logger.error(
f"Could not upload document for {req._filter} as it was too large for Mongo"
)
else:
raise e
MongoURIStore (MongoStore)
¶
A Store that connects to a Mongo collection via a URI This is expected to be a special mongodb+srv:// URIs that include client parameters via TXT records
Source code in maggma/stores/mongolike.py
class MongoURIStore(MongoStore):
"""
A Store that connects to a Mongo collection via a URI
This is expected to be a special mongodb+srv:// URIs that include
client parameters via TXT records
"""
def __init__(
self,
uri: str,
collection_name: str,
database: str = None,
ssh_tunnel: Optional[SSHTunnel] = None,
mongoclient_kwargs: Optional[Dict] = None,
default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
**kwargs,
):
"""
Args:
uri: MongoDB+SRV URI
database: database to connect to
collection_name: The collection name
default_sort: Default sort field and direction to use when querying. Can be used to
ensure determinacy in query results.
"""
self.uri = uri
self.ssh_tunnel = ssh_tunnel
self.default_sort = default_sort
self.mongoclient_kwargs = mongoclient_kwargs or {}
# parse the dbname from the uri
if database is None:
d_uri = uri_parser.parse_uri(uri)
if d_uri["database"] is None:
raise ConfigurationError(
"If database name is not supplied, a database must be set in the uri"
)
self.database = d_uri["database"]
else:
self.database = database
self.collection_name = collection_name
self.kwargs = kwargs
self._coll = None
super(MongoStore, self).__init__(**kwargs) # lgtm
@property
def name(self) -> str:
"""
Return a string representing this data source
"""
# TODO: This is not very safe since it exposes the username/password info
return self.uri
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self._coll is None or force_reset: # pragma: no cover
conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
db = conn[self.database]
self._coll = db[self.collection_name] # type: ignore
name: str
property
readonly
¶
Return a string representing this data source
__init__(self, uri, collection_name, database=None, ssh_tunnel=None, mongoclient_kwargs=None, default_sort=None, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
str |
MongoDB+SRV URI |
required |
database |
str |
database to connect to |
None |
collection_name |
str |
The collection name |
required |
default_sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Default sort field and direction to use when querying. Can be used to ensure determinacy in query results. |
None |
Source code in maggma/stores/mongolike.py
def __init__(
self,
uri: str,
collection_name: str,
database: str = None,
ssh_tunnel: Optional[SSHTunnel] = None,
mongoclient_kwargs: Optional[Dict] = None,
default_sort: Optional[Dict[str, Union[Sort, int]]] = None,
**kwargs,
):
"""
Args:
uri: MongoDB+SRV URI
database: database to connect to
collection_name: The collection name
default_sort: Default sort field and direction to use when querying. Can be used to
ensure determinacy in query results.
"""
self.uri = uri
self.ssh_tunnel = ssh_tunnel
self.default_sort = default_sort
self.mongoclient_kwargs = mongoclient_kwargs or {}
# parse the dbname from the uri
if database is None:
d_uri = uri_parser.parse_uri(uri)
if d_uri["database"] is None:
raise ConfigurationError(
"If database name is not supplied, a database must be set in the uri"
)
self.database = d_uri["database"]
else:
self.database = database
self.collection_name = collection_name
self.kwargs = kwargs
self._coll = None
super(MongoStore, self).__init__(**kwargs) # lgtm
connect(self, force_reset=False)
¶
Connect to the source data
Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self._coll is None or force_reset: # pragma: no cover
conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
db = conn[self.database]
self._coll = db[self.collection_name] # type: ignore
MontyStore (MemoryStore)
¶
A MongoDB compatible store that uses on disk files for storage.
This is handled under the hood using MontyDB. A number of on-disk storage options are available but MontyDB provides a mongo style interface for all options. The options include:
- sqlite: Uses an sqlite database to store documents.
- lightning: Uses Lightning Memory-Mapped Database (LMDB) for storage. This can
provide fast read and write times but requires lmdb to be installed (in most cases
this can be achieved using
pip install lmdb
). - flatfile: Uses a system of flat json files. This is not recommended as multiple simultaneous connections to the store will not work correctly.
See the MontyDB repository for more information: https://github.com/davidlatwe/montydb
Source code in maggma/stores/mongolike.py
class MontyStore(MemoryStore):
"""
A MongoDB compatible store that uses on disk files for storage.
This is handled under the hood using MontyDB. A number of on-disk storage options
are available but MontyDB provides a mongo style interface for all options. The
options include:
- sqlite: Uses an sqlite database to store documents.
- lightning: Uses Lightning Memory-Mapped Database (LMDB) for storage. This can
provide fast read and write times but requires lmdb to be installed (in most cases
this can be achieved using ``pip install lmdb``).
- flatfile: Uses a system of flat json files. This is not recommended as multiple
simultaneous connections to the store will not work correctly.
See the MontyDB repository for more information: https://github.com/davidlatwe/montydb
"""
def __init__(
self,
collection_name,
database_path: str = None,
database_name: str = "db",
storage: str = "sqlite",
storage_kwargs: Optional[dict] = None,
client_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Initializes the Monty Store.
Args:
collection_name: Name for the collection.
database_path: Path to on-disk database files. If None, the current working
directory will be used.
database_name: The database name.
storage: The storage type. Options include "sqlite", "lightning", "flatfile".
storage_kwargs: Keyword arguments passed to ``montydb.set_storage``.
client_kwargs: Keyword arguments passed to the ``montydb.MontyClient``
constructor.
**kwargs: Additional keyword arguments passed to the Store constructor.
"""
if database_path is None:
database_path = str(Path.cwd())
self.database_path = database_path
self.database_name = database_name
self.collection_name = collection_name
self._coll = None # type: ignore
self.default_sort = None
self.ssh_tunnel = None # This is to fix issues with the tunnel on close
self.kwargs = kwargs
self.storage = storage
self.storage_kwargs = storage_kwargs or {
"use_bson": True,
"monty_version": "4.0",
}
self.client_kwargs = client_kwargs or {}
super(MongoStore, self).__init__(**kwargs) # noqa
def connect(self, force_reset: bool = False):
"""
Connect to the database store.
Args:
force_reset: Force connection reset.
"""
from montydb import set_storage, MontyClient # type: ignore
set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
client = MontyClient(self.database_path, **self.client_kwargs)
if not self._coll or force_reset:
self._coll = client["db"][self.collection_name]
@property
def name(self) -> str:
"""Return a string representing this data source."""
return f"monty://{self.database_path}/{self.database}/{self.collection_name}"
def count(
self,
criteria: Optional[Dict] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
"""
criteria = criteria if criteria else {}
hint_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in hint.items()
]
if hint
else None
)
if hint_list is not None: # pragma: no cover
return self._collection.count_documents(filter=criteria, hint=hint_list)
return self._collection.count_documents(filter=criteria)
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store.
Args:
docs: The document or list of documents to update.
key: Field name(s) to determine uniqueness for a document, can be a list of
multiple fields, a single field, or None if the Store's key field is to be
used.
"""
if not isinstance(docs, list):
docs = [docs]
for d in docs:
d = jsanitize(d, allow_bson=True)
# document-level validation is optional
validates = True
if self.validator:
validates = self.validator.is_valid(d)
if not validates:
if self.validator.strict:
raise ValueError(self.validator.validation_errors(d))
else:
self.logger.error(self.validator.validation_errors(d))
if validates:
key = key or self.key
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
else:
search_doc = {key: d[key]}
self._collection.replace_one(search_doc, d, upsert=True)
name: str
property
readonly
¶
Return a string representing this data source.
__init__(self, collection_name, database_path=None, database_name='db', storage='sqlite', storage_kwargs=None, client_kwargs=None, **kwargs)
special
¶
Initializes the Monty Store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_name |
Name for the collection. |
required | |
database_path |
str |
Path to on-disk database files. If None, the current working directory will be used. |
None |
database_name |
str |
The database name. |
'db' |
storage |
str |
The storage type. Options include "sqlite", "lightning", "flatfile". |
'sqlite' |
storage_kwargs |
Optional[dict] |
Keyword arguments passed to |
None |
client_kwargs |
Optional[dict] |
Keyword arguments passed to the |
None |
**kwargs |
Additional keyword arguments passed to the Store constructor. |
{} |
Source code in maggma/stores/mongolike.py
def __init__(
self,
collection_name,
database_path: str = None,
database_name: str = "db",
storage: str = "sqlite",
storage_kwargs: Optional[dict] = None,
client_kwargs: Optional[dict] = None,
**kwargs,
):
"""
Initializes the Monty Store.
Args:
collection_name: Name for the collection.
database_path: Path to on-disk database files. If None, the current working
directory will be used.
database_name: The database name.
storage: The storage type. Options include "sqlite", "lightning", "flatfile".
storage_kwargs: Keyword arguments passed to ``montydb.set_storage``.
client_kwargs: Keyword arguments passed to the ``montydb.MontyClient``
constructor.
**kwargs: Additional keyword arguments passed to the Store constructor.
"""
if database_path is None:
database_path = str(Path.cwd())
self.database_path = database_path
self.database_name = database_name
self.collection_name = collection_name
self._coll = None # type: ignore
self.default_sort = None
self.ssh_tunnel = None # This is to fix issues with the tunnel on close
self.kwargs = kwargs
self.storage = storage
self.storage_kwargs = storage_kwargs or {
"use_bson": True,
"monty_version": "4.0",
}
self.client_kwargs = client_kwargs or {}
super(MongoStore, self).__init__(**kwargs) # noqa
connect(self, force_reset=False)
¶
Connect to the database store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force_reset |
bool |
Force connection reset. |
False |
Source code in maggma/stores/mongolike.py
def connect(self, force_reset: bool = False):
"""
Connect to the database store.
Args:
force_reset: Force connection reset.
"""
from montydb import set_storage, MontyClient # type: ignore
set_storage(self.database_path, storage=self.storage, **self.storage_kwargs)
client = MontyClient(self.database_path, **self.client_kwargs)
if not self._coll or force_reset:
self._coll = client["db"][self.collection_name]
count(self, criteria=None, hint=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
hint |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
Source code in maggma/stores/mongolike.py
def count(
self,
criteria: Optional[Dict] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
"""
criteria = criteria if criteria else {}
hint_list = (
[
(k, Sort(v).value) if isinstance(v, int) else (k, v.value)
for k, v in hint.items()
]
if hint
else None
)
if hint_list is not None: # pragma: no cover
return self._collection.count_documents(filter=criteria, hint=hint_list)
return self._collection.count_documents(filter=criteria)
update(self, docs, key=None)
¶
Update documents into the Store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
The document or list of documents to update. |
required |
key |
Union[List, str] |
Field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used. |
None |
Source code in maggma/stores/mongolike.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store.
Args:
docs: The document or list of documents to update.
key: Field name(s) to determine uniqueness for a document, can be a list of
multiple fields, a single field, or None if the Store's key field is to be
used.
"""
if not isinstance(docs, list):
docs = [docs]
for d in docs:
d = jsanitize(d, allow_bson=True)
# document-level validation is optional
validates = True
if self.validator:
validates = self.validator.is_valid(d)
if not validates:
if self.validator.strict:
raise ValueError(self.validator.validation_errors(d))
else:
self.logger.error(self.validator.validation_errors(d))
if validates:
key = key or self.key
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
else:
search_doc = {key: d[key]}
self._collection.replace_one(search_doc, d, upsert=True)
SSHTunnel (MSONable)
¶
Source code in maggma/stores/mongolike.py
class SSHTunnel(MSONable):
__TUNNELS: Dict[str, SSHTunnelForwarder] = {}
def __init__(
self,
tunnel_server_address: str,
remote_server_address: str,
username: Optional[str] = None,
password: Optional[str] = None,
private_key: Optional[str] = None,
**kwargs,
):
"""
Args:
tunnel_server_address: string address with port for the SSH tunnel server
remote_server_address: string address with port for the server to connect to
username: optional username for the ssh tunnel server
password: optional password for the ssh tunnel server; If a private_key is
supplied this password is assumed to be the private key password
private_key: ssh private key to authenticate to the tunnel server
kwargs: any extra args passed to the SSHTunnelForwarder
"""
self.tunnel_server_address = tunnel_server_address
self.remote_server_address = remote_server_address
self.username = username
self.password = password
self.private_key = private_key
self.kwargs = kwargs
if remote_server_address in SSHTunnel.__TUNNELS:
self.tunnel = SSHTunnel.__TUNNELS[remote_server_address]
else:
open_port = _find_free_port("127.0.0.1")
local_bind_address = ("127.0.0.1", open_port)
ssh_address, ssh_port = tunnel_server_address.split(":")
ssh_port = int(ssh_port) # type: ignore
remote_bind_address, remote_bind_port = remote_server_address.split(":")
remote_bind_port = int(remote_bind_port) # type: ignore
if private_key is not None:
ssh_password = None
ssh_private_key_password = password
else:
ssh_password = password
ssh_private_key_password = None
self.tunnel = SSHTunnelForwarder(
ssh_address_or_host=(ssh_address, ssh_port),
local_bind_address=local_bind_address,
remote_bind_address=(remote_bind_address, remote_bind_port),
ssh_username=username,
ssh_password=ssh_password,
ssh_private_key_password=ssh_private_key_password,
ssh_pkey=private_key,
**kwargs,
)
def start(self):
if not self.tunnel.is_active:
self.tunnel.start()
def stop(self):
if self.tunnel.tunnel_is_up:
self.tunnel.stop()
@property
def local_address(self) -> Tuple[str, int]:
return self.tunnel.local_bind_address
__init__(self, tunnel_server_address, remote_server_address, username=None, password=None, private_key=None, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tunnel_server_address |
str |
string address with port for the SSH tunnel server |
required |
remote_server_address |
str |
string address with port for the server to connect to |
required |
username |
Optional[str] |
optional username for the ssh tunnel server |
None |
password |
Optional[str] |
optional password for the ssh tunnel server; If a private_key is supplied this password is assumed to be the private key password |
None |
private_key |
Optional[str] |
ssh private key to authenticate to the tunnel server |
None |
kwargs |
any extra args passed to the SSHTunnelForwarder |
{} |
Source code in maggma/stores/mongolike.py
def __init__(
self,
tunnel_server_address: str,
remote_server_address: str,
username: Optional[str] = None,
password: Optional[str] = None,
private_key: Optional[str] = None,
**kwargs,
):
"""
Args:
tunnel_server_address: string address with port for the SSH tunnel server
remote_server_address: string address with port for the server to connect to
username: optional username for the ssh tunnel server
password: optional password for the ssh tunnel server; If a private_key is
supplied this password is assumed to be the private key password
private_key: ssh private key to authenticate to the tunnel server
kwargs: any extra args passed to the SSHTunnelForwarder
"""
self.tunnel_server_address = tunnel_server_address
self.remote_server_address = remote_server_address
self.username = username
self.password = password
self.private_key = private_key
self.kwargs = kwargs
if remote_server_address in SSHTunnel.__TUNNELS:
self.tunnel = SSHTunnel.__TUNNELS[remote_server_address]
else:
open_port = _find_free_port("127.0.0.1")
local_bind_address = ("127.0.0.1", open_port)
ssh_address, ssh_port = tunnel_server_address.split(":")
ssh_port = int(ssh_port) # type: ignore
remote_bind_address, remote_bind_port = remote_server_address.split(":")
remote_bind_port = int(remote_bind_port) # type: ignore
if private_key is not None:
ssh_password = None
ssh_private_key_password = password
else:
ssh_password = password
ssh_private_key_password = None
self.tunnel = SSHTunnelForwarder(
ssh_address_or_host=(ssh_address, ssh_port),
local_bind_address=local_bind_address,
remote_bind_address=(remote_bind_address, remote_bind_port),
ssh_username=username,
ssh_password=ssh_password,
ssh_private_key_password=ssh_private_key_password,
ssh_pkey=private_key,
**kwargs,
)
Module defining a FileStore that enables accessing files in a local directory using typical maggma access patterns.
FileStore (MemoryStore)
¶
A Store for files on disk. Provides a common access method consistent with other stores. Each Item in the Store represents one file. Files can be organized into any type of directory structure.
A hash of the full path to each file is used to define a file_id that uniquely identifies each item.
Any metadata added to the items is written to a .json file in the root directory of the FileStore.
Source code in maggma/stores/file_store.py
class FileStore(MemoryStore):
"""
A Store for files on disk. Provides a common access method consistent with
other stores. Each Item in the Store represents one file. Files can be organized
into any type of directory structure.
A hash of the full path to each file is used to define a file_id that uniquely
identifies each item.
Any metadata added to the items is written to a .json file in the root directory
of the FileStore.
"""
def __init__(
self,
path: Union[str, Path],
file_filters: Optional[List] = None,
max_depth: Optional[int] = None,
read_only: bool = True,
include_orphans: bool = False,
json_name: str = "FileStore.json",
**kwargs,
):
"""
Initializes a FileStore
Args:
path: parent directory containing all files and subdirectories to process
file_filters: List of fnmatch patterns defining the files to be tracked by
the FileStore. Only files that match one of the patterns provided will
be included in the Store If None (default), all files are included.
Examples: ["*.txt", "test-[abcd].txt"], etc.
See https://docs.python.org/3/library/fnmatch.html for full syntax
max_depth: The maximum depth to look into subdirectories. 0 = no recursion,
1 = include files 1 directory below the FileStore, etc.
None (default) will scan all files below
the FileStore root directory, regardless of depth.
read_only: If True (default), the .update() and .remove_docs()
methods are disabled, preventing any changes to the files on
disk. In addition, metadata cannot be written to disk.
include_orphans: Whether to include orphaned metadata records in query results.
Orphaned metadata records are records found in the local JSON file that can
no longer be associated to a file on disk. This can happen if a file is renamed
or deleted, or if the FileStore is re-initialized with a more restrictive
file_filters or max_depth argument. By default (False), these records
do not appear in query results. Nevertheless, the metadata records are
retained in the JSON file and the FileStore to prevent accidental data loss.
json_name: Name of the .json file to which metadata is saved. If read_only
is False, this file will be created in the root directory of the
FileStore.
kwargs: kwargs passed to MemoryStore.__init__()
"""
# this conditional block is needed in order to guarantee that the 'name'
# property, which is passed to `MemoryStore`, works correctly
# collection names passed to MemoryStore cannot end with '.'
if path == ".":
path = Path.cwd()
self.path = Path(path) if isinstance(path, str) else path
self.json_name = json_name
self.file_filters = file_filters if file_filters else ["*"]
self.collection_name = "file_store"
self.key = "file_id"
self.include_orphans = include_orphans
self.read_only = read_only
self.max_depth = max_depth
self.metadata_store = JSONStore(
paths=[str(self.path / self.json_name)],
read_only=self.read_only,
collection_name=self.collection_name,
key=self.key,
)
self.kwargs = kwargs
super().__init__(
collection_name=self.collection_name,
key=self.key,
**self.kwargs,
)
@property
def name(self) -> str:
"""
Return a string representing this data source
"""
return f"file://{self.path}"
def add_metadata(
self,
metadata: Dict = {},
query: Optional[Dict] = None,
auto_data: Callable[[Dict], Dict] = None,
**kwargs,
):
"""
Add metadata to a record in the FileStore, either manually or by computing it automatically
from another field, such as name or path (see auto_data).
Args:
metadata: dict of additional data to add to the records returned by query.
Note that any protected keys (such as 'name', 'path', etc.)
will be ignored.
query: Query passed to FileStore.query()
auto_data: A function that automatically computes metadata based on a field in
the record itself. The function must take in the item as a dict and
return a dict containing the desired metadata. A typical use case is
to assign metadata based on the name of a file. For example, for
data files named like `2022-04-01_april_fool_experiment.txt`, the
auto_data function could be:
def get_metadata_from_filename(d):
return {"date": d["name"].split("_")[0],
"test_name": d["name"].split("_")[1]
}
Note that in the case of conflict between manual and automatically
computed metadata (for example, if metadata={"name": "another_name"} was
supplied alongside the auto_data function above), the manually-supplied
metadata is used.
kwargs: kwargs passed to FileStore.query()
"""
# sanitize the metadata
filtered_metadata = self._filter_data(metadata)
updated_docs = []
for doc in self.query(query, **kwargs):
if auto_data:
extra_data = self._filter_data(auto_data(doc))
doc.update(extra_data)
doc.update(filtered_metadata)
updated_docs.append(doc)
self.update(updated_docs, key=self.key)
def read(self) -> List[Dict]:
"""
Iterate through all files in the Store folder and populate
the Store with dictionaries containing basic information about each file.
The keys of the documents added to the Store are
name: str = File name
path: Path = Absolute path of this file
parent: str = Name of the parent directory (if any)
file_id: str = Unique identifier for this file, computed from the hash
of its path relative to the base FileStore directory and
the file creation time. The key of this field is 'file_id'
by default but can be changed via the 'key' kwarg to
FileStore.__init__().
size: int = Size of this file in bytes
last_updated: datetime = Time this file was last modified
hash: str = Hash of the file contents
orphan: bool = Whether this record is an orphan
"""
file_list = []
# generate a list of files in subdirectories
for pattern in self.file_filters:
# list every file that matches the pattern
for f in self.path.rglob(pattern):
if f.is_file():
# ignore the .json file created by the Store
if f.name == self.json_name:
continue
# filter based on depth
depth = len(f.relative_to(self.path).parts) - 1
if self.max_depth is None or depth <= self.max_depth:
file_list.append(self._create_record_from_file(f))
return file_list
def _create_record_from_file(self, f: Union[str, Path]) -> Dict:
"""
Given the path to a file, return a Dict that constitues a record of
basic information about that file. The keys in the returned dict
are:
name: str = File name
path: Path = Absolute path of this file
parent: str = Name of the parent directory (if any)
file_id: str = Unique identifier for this file, computed from the hash
of its path relative to the base FileStore directory and
the file creation time. The key of this field is 'file_id'
by default but can be changed via the 'key' kwarg to
FileStore.__init__().
size: int = Size of this file in bytes
last_updated: datetime = Time this file was last modified
hash: str = Hash of the file contents
orphan: bool = Whether this record is an orphan
"""
# make sure f is a Path object
if not isinstance(f, Path):
f = Path(f)
# compute the file_id from the relative path
relative_path = f.relative_to(self.path)
digest = hashlib.md5()
digest.update(str(relative_path).encode())
file_id = str(digest.hexdigest())
# hash the file contents
digest2 = hashlib.md5()
block_size = 128 * digest2.block_size
digest2.update(self.name.encode())
with open(f.as_posix(), "rb") as file:
buf = file.read(block_size)
digest2.update(buf)
content_hash = str(digest2.hexdigest())
d = {
"name": f.name,
"path": f,
"path_relative": relative_path,
"parent": f.parent.name,
"size": f.stat().st_size,
"last_updated": datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc),
"orphan": False,
"hash": content_hash,
self.key: file_id,
}
return d
def connect(self, force_reset: bool = False):
"""
Connect to the source data
Read all the files in the directory, create corresponding File
items in the internal MemoryStore.
If there is a metadata .json file in the directory, read its
contents into the MemoryStore
Args:
force_reset: whether to reset the connection or not
"""
# read all files and place them in the MemoryStore
# use super.update to bypass the read_only guard statement
# because we want the file data to be populated in memory
super().connect()
super().update(self.read())
# now read any metadata from the .json file
try:
self.metadata_store.connect()
metadata = [d for d in self.metadata_store.query()]
except FileNotFoundError:
metadata = []
warnings.warn(
f"""
JSON file '{self.json_name}' not found. To create this file automatically, re-initialize
the FileStore with read_only=False.
"""
)
# merge metadata with file data and check for orphaned metadata
requests = []
found_orphans = False
key = self.key
file_ids = self.distinct(self.key)
for d in metadata:
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
else:
search_doc = {key: d[key]}
if d[key] not in file_ids:
found_orphans = True
d.update({"orphan": True})
del d["_id"]
requests.append(UpdateOne(search_doc, {"$set": d}, upsert=True))
if found_orphans:
warnings.warn(
f"Orphaned metadata was found in {self.json_name}. This metadata"
"will be added to the store with {'orphan': True}"
)
if len(requests) > 0:
self._collection.bulk_write(requests, ordered=False)
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update items in the Store. Only possible if the store is not read only. Any new
fields that are added will be written to the JSON file in the root directory
of the FileStore.
Note that certain fields that come from file metadata on disk are protected and
cannot be updated with this method. This prevents the contents of the FileStore
from becoming out of sync with the files on which it is based. The protected fields
are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent',
'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are
retained to make each document in the JSON file identifiable by manual inspection.
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
if self.read_only:
raise StoreError(
"This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
)
super().update(docs, key)
data = [d for d in self.query()]
filtered_data = []
# remove fields that are populated by .read()
for d in data:
filtered_d = self._filter_data(d)
# don't write records that contain only file_id
if (
len(set(filtered_d.keys()).difference(set(["path_relative", self.key])))
!= 0
):
filtered_data.append(filtered_d)
self.metadata_store.update(filtered_data, self.key)
def _filter_data(self, d):
"""
Remove any protected keys from a dictionary
Args:
d: Dictionary whose keys are to be filtered
"""
filtered_d = {
k: v
for k, v in d.items()
if k not in PROTECTED_KEYS.union({self.last_updated_field})
}
return filtered_d
def query( # type: ignore
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
contents_size_limit: Optional[int] = None,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
contents_size_limit: Maximum file size in bytes for which to return contents.
The FileStore will attempt to read the file and populate the 'contents' key
with its content at query time, unless the file size is larger than this value.
"""
return_contents = False
criteria = criteria if criteria else {}
if criteria.get("orphan", None) is None:
if not self.include_orphans:
criteria.update({"orphan": False})
if criteria.get("contents"):
warnings.warn("'contents' is not a queryable field! Ignoring.")
if isinstance(properties, list):
properties = {p: 1 for p in properties}
orig_properties = properties.copy() if properties else None
if properties is None or properties.get("contents"):
return_contents = True
if properties is not None and return_contents:
# remove contents b/c it isn't stored in the MemoryStore
properties.pop("contents")
# add size and path to query so that file can be read
properties.update({"size": 1})
properties.update({"path": 1})
for d in super().query(
criteria=criteria,
properties=properties,
sort=sort,
hint=hint,
skip=skip,
limit=limit,
):
# add file contents to the returned documents, if appropriate
if return_contents:
if contents_size_limit is None or d["size"] <= contents_size_limit:
# attempt to read the file contents and inject into the document
# TODO - could add more logic for detecting different file types
# and more nuanced exception handling
try:
with zopen(d["path"], "r") as f:
data = f.read()
except Exception as e:
data = f"Unable to read: {e}"
elif d["size"] > contents_size_limit:
data = "Unable to read: file too large"
else:
data = "Unable to read: Unknown error"
d.update({"contents": data})
# remove size and path if not explicitly requested
if orig_properties is not None and "size" not in orig_properties:
d.pop("size")
if orig_properties is not None and "path" not in orig_properties:
d.pop("path")
yield d
def query_one(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
contents_size_limit: Optional[int] = None,
):
"""
Queries the Store for a single document
Args:
criteria: PyMongo filter for documents to search
properties: properties to return in the document
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
contents_size_limit: Maximum file size in bytes for which to return contents.
The FileStore will attempt to read the file and populate the 'contents' key
with its content at query time, unless the file size is larger than this value.
"""
return next(
self.query(
criteria=criteria,
properties=properties,
sort=sort,
contents_size_limit=contents_size_limit,
),
None,
)
def remove_docs(self, criteria: Dict, confirm: bool = False):
"""
Remove items matching the query dictionary.
Args:
criteria: query dictionary to match
confirm: Boolean flag to confirm that remove_docs should delete
files on disk. Default: False.
"""
if self.read_only:
raise StoreError(
"This Store is read-only. To enable file I/O, re-initialize the "
"store with read_only=False."
)
docs = [d for d in self.query(criteria)]
# this ensures that any modifications to criteria made by self.query
# (e.g., related to orphans or contents) are propagated through to the superclass
new_criteria = {"file_id": {"$in": [d["file_id"] for d in docs]}}
if len(docs) > 0 and not confirm:
raise StoreError(
f"Warning! This command is about to delete {len(docs)} items from disk! "
"If this is what you want, reissue this command with confirm=True."
)
for d in docs:
Path(d["path"]).unlink()
super().remove_docs(criteria=new_criteria)
name: str
property
readonly
¶
Return a string representing this data source
__init__(self, path, file_filters=None, max_depth=None, read_only=True, include_orphans=False, json_name='FileStore.json', **kwargs)
special
¶
Initializes a FileStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, pathlib.Path] |
parent directory containing all files and subdirectories to process |
required |
file_filters |
Optional[List] |
List of fnmatch patterns defining the files to be tracked by the FileStore. Only files that match one of the patterns provided will be included in the Store If None (default), all files are included. Examples: ["*.txt", "test-[abcd].txt"], etc. See https://docs.python.org/3/library/fnmatch.html for full syntax |
None |
max_depth |
Optional[int] |
The maximum depth to look into subdirectories. 0 = no recursion, 1 = include files 1 directory below the FileStore, etc. None (default) will scan all files below the FileStore root directory, regardless of depth. |
None |
read_only |
bool |
If True (default), the .update() and .remove_docs() methods are disabled, preventing any changes to the files on disk. In addition, metadata cannot be written to disk. |
True |
include_orphans |
bool |
Whether to include orphaned metadata records in query results. Orphaned metadata records are records found in the local JSON file that can no longer be associated to a file on disk. This can happen if a file is renamed or deleted, or if the FileStore is re-initialized with a more restrictive file_filters or max_depth argument. By default (False), these records do not appear in query results. Nevertheless, the metadata records are retained in the JSON file and the FileStore to prevent accidental data loss. |
False |
json_name |
str |
Name of the .json file to which metadata is saved. If read_only is False, this file will be created in the root directory of the FileStore. |
'FileStore.json' |
kwargs |
kwargs passed to MemoryStore.init() |
{} |
Source code in maggma/stores/file_store.py
def __init__(
self,
path: Union[str, Path],
file_filters: Optional[List] = None,
max_depth: Optional[int] = None,
read_only: bool = True,
include_orphans: bool = False,
json_name: str = "FileStore.json",
**kwargs,
):
"""
Initializes a FileStore
Args:
path: parent directory containing all files and subdirectories to process
file_filters: List of fnmatch patterns defining the files to be tracked by
the FileStore. Only files that match one of the patterns provided will
be included in the Store If None (default), all files are included.
Examples: ["*.txt", "test-[abcd].txt"], etc.
See https://docs.python.org/3/library/fnmatch.html for full syntax
max_depth: The maximum depth to look into subdirectories. 0 = no recursion,
1 = include files 1 directory below the FileStore, etc.
None (default) will scan all files below
the FileStore root directory, regardless of depth.
read_only: If True (default), the .update() and .remove_docs()
methods are disabled, preventing any changes to the files on
disk. In addition, metadata cannot be written to disk.
include_orphans: Whether to include orphaned metadata records in query results.
Orphaned metadata records are records found in the local JSON file that can
no longer be associated to a file on disk. This can happen if a file is renamed
or deleted, or if the FileStore is re-initialized with a more restrictive
file_filters or max_depth argument. By default (False), these records
do not appear in query results. Nevertheless, the metadata records are
retained in the JSON file and the FileStore to prevent accidental data loss.
json_name: Name of the .json file to which metadata is saved. If read_only
is False, this file will be created in the root directory of the
FileStore.
kwargs: kwargs passed to MemoryStore.__init__()
"""
# this conditional block is needed in order to guarantee that the 'name'
# property, which is passed to `MemoryStore`, works correctly
# collection names passed to MemoryStore cannot end with '.'
if path == ".":
path = Path.cwd()
self.path = Path(path) if isinstance(path, str) else path
self.json_name = json_name
self.file_filters = file_filters if file_filters else ["*"]
self.collection_name = "file_store"
self.key = "file_id"
self.include_orphans = include_orphans
self.read_only = read_only
self.max_depth = max_depth
self.metadata_store = JSONStore(
paths=[str(self.path / self.json_name)],
read_only=self.read_only,
collection_name=self.collection_name,
key=self.key,
)
self.kwargs = kwargs
super().__init__(
collection_name=self.collection_name,
key=self.key,
**self.kwargs,
)
add_metadata(self, metadata={}, query=None, auto_data=None, **kwargs)
¶
Add metadata to a record in the FileStore, either manually or by computing it automatically from another field, such as name or path (see auto_data).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metadata |
Dict |
dict of additional data to add to the records returned by query. Note that any protected keys (such as 'name', 'path', etc.) will be ignored. |
{} |
query |
Optional[Dict] |
Query passed to FileStore.query() |
None |
auto_data |
Callable[[Dict], Dict] |
A function that automatically computes metadata based on a field in
the record itself. The function must take in the item as a dict and
return a dict containing the desired metadata. A typical use case is
to assign metadata based on the name of a file. For example, for
data files named like
|
None |
kwargs |
kwargs passed to FileStore.query() |
{} |
Source code in maggma/stores/file_store.py
def add_metadata(
self,
metadata: Dict = {},
query: Optional[Dict] = None,
auto_data: Callable[[Dict], Dict] = None,
**kwargs,
):
"""
Add metadata to a record in the FileStore, either manually or by computing it automatically
from another field, such as name or path (see auto_data).
Args:
metadata: dict of additional data to add to the records returned by query.
Note that any protected keys (such as 'name', 'path', etc.)
will be ignored.
query: Query passed to FileStore.query()
auto_data: A function that automatically computes metadata based on a field in
the record itself. The function must take in the item as a dict and
return a dict containing the desired metadata. A typical use case is
to assign metadata based on the name of a file. For example, for
data files named like `2022-04-01_april_fool_experiment.txt`, the
auto_data function could be:
def get_metadata_from_filename(d):
return {"date": d["name"].split("_")[0],
"test_name": d["name"].split("_")[1]
}
Note that in the case of conflict between manual and automatically
computed metadata (for example, if metadata={"name": "another_name"} was
supplied alongside the auto_data function above), the manually-supplied
metadata is used.
kwargs: kwargs passed to FileStore.query()
"""
# sanitize the metadata
filtered_metadata = self._filter_data(metadata)
updated_docs = []
for doc in self.query(query, **kwargs):
if auto_data:
extra_data = self._filter_data(auto_data(doc))
doc.update(extra_data)
doc.update(filtered_metadata)
updated_docs.append(doc)
self.update(updated_docs, key=self.key)
connect(self, force_reset=False)
¶
Connect to the source data
Read all the files in the directory, create corresponding File items in the internal MemoryStore.
If there is a metadata .json file in the directory, read its contents into the MemoryStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force_reset |
bool |
whether to reset the connection or not |
False |
Source code in maggma/stores/file_store.py
def connect(self, force_reset: bool = False):
"""
Connect to the source data
Read all the files in the directory, create corresponding File
items in the internal MemoryStore.
If there is a metadata .json file in the directory, read its
contents into the MemoryStore
Args:
force_reset: whether to reset the connection or not
"""
# read all files and place them in the MemoryStore
# use super.update to bypass the read_only guard statement
# because we want the file data to be populated in memory
super().connect()
super().update(self.read())
# now read any metadata from the .json file
try:
self.metadata_store.connect()
metadata = [d for d in self.metadata_store.query()]
except FileNotFoundError:
metadata = []
warnings.warn(
f"""
JSON file '{self.json_name}' not found. To create this file automatically, re-initialize
the FileStore with read_only=False.
"""
)
# merge metadata with file data and check for orphaned metadata
requests = []
found_orphans = False
key = self.key
file_ids = self.distinct(self.key)
for d in metadata:
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
else:
search_doc = {key: d[key]}
if d[key] not in file_ids:
found_orphans = True
d.update({"orphan": True})
del d["_id"]
requests.append(UpdateOne(search_doc, {"$set": d}, upsert=True))
if found_orphans:
warnings.warn(
f"Orphaned metadata was found in {self.json_name}. This metadata"
"will be added to the store with {'orphan': True}"
)
if len(requests) > 0:
self._collection.bulk_write(requests, ordered=False)
query(self, criteria=None, properties=None, sort=None, hint=None, skip=0, limit=0, contents_size_limit=None)
¶
Queries the Store for a set of documents
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
hint |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of indexes to use as hints for query optimizer. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
contents_size_limit |
Optional[int] |
Maximum file size in bytes for which to return contents. The FileStore will attempt to read the file and populate the 'contents' key with its content at query time, unless the file size is larger than this value. |
None |
Source code in maggma/stores/file_store.py
def query( # type: ignore
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
hint: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
contents_size_limit: Optional[int] = None,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
hint: Dictionary of indexes to use as hints for query optimizer.
Keys are field names and values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
contents_size_limit: Maximum file size in bytes for which to return contents.
The FileStore will attempt to read the file and populate the 'contents' key
with its content at query time, unless the file size is larger than this value.
"""
return_contents = False
criteria = criteria if criteria else {}
if criteria.get("orphan", None) is None:
if not self.include_orphans:
criteria.update({"orphan": False})
if criteria.get("contents"):
warnings.warn("'contents' is not a queryable field! Ignoring.")
if isinstance(properties, list):
properties = {p: 1 for p in properties}
orig_properties = properties.copy() if properties else None
if properties is None or properties.get("contents"):
return_contents = True
if properties is not None and return_contents:
# remove contents b/c it isn't stored in the MemoryStore
properties.pop("contents")
# add size and path to query so that file can be read
properties.update({"size": 1})
properties.update({"path": 1})
for d in super().query(
criteria=criteria,
properties=properties,
sort=sort,
hint=hint,
skip=skip,
limit=limit,
):
# add file contents to the returned documents, if appropriate
if return_contents:
if contents_size_limit is None or d["size"] <= contents_size_limit:
# attempt to read the file contents and inject into the document
# TODO - could add more logic for detecting different file types
# and more nuanced exception handling
try:
with zopen(d["path"], "r") as f:
data = f.read()
except Exception as e:
data = f"Unable to read: {e}"
elif d["size"] > contents_size_limit:
data = "Unable to read: file too large"
else:
data = "Unable to read: Unknown error"
d.update({"contents": data})
# remove size and path if not explicitly requested
if orig_properties is not None and "size" not in orig_properties:
d.pop("size")
if orig_properties is not None and "path" not in orig_properties:
d.pop("path")
yield d
query_one(self, criteria=None, properties=None, sort=None, contents_size_limit=None)
¶
Queries the Store for a single document
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search |
None |
properties |
Union[Dict, List] |
properties to return in the document |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
contents_size_limit |
Optional[int] |
Maximum file size in bytes for which to return contents. The FileStore will attempt to read the file and populate the 'contents' key with its content at query time, unless the file size is larger than this value. |
None |
Source code in maggma/stores/file_store.py
def query_one(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
contents_size_limit: Optional[int] = None,
):
"""
Queries the Store for a single document
Args:
criteria: PyMongo filter for documents to search
properties: properties to return in the document
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
contents_size_limit: Maximum file size in bytes for which to return contents.
The FileStore will attempt to read the file and populate the 'contents' key
with its content at query time, unless the file size is larger than this value.
"""
return next(
self.query(
criteria=criteria,
properties=properties,
sort=sort,
contents_size_limit=contents_size_limit,
),
None,
)
read(self)
¶
Iterate through all files in the Store folder and populate the Store with dictionaries containing basic information about each file.
The keys of the documents added to the Store are
name: str = File name
path: Path = Absolute path of this file
parent: str = Name of the parent directory (if any)
!!! file_id "str = Unique identifier for this file, computed from the hash"
of its path relative to the base FileStore directory and
the file creation time. The key of this field is 'file_id'
by default but can be changed via the 'key' kwarg to
FileStore.__init__().
size: int = Size of this file in bytes
last_updated: datetime = Time this file was last modified
hash: str = Hash of the file contents
orphan: bool = Whether this record is an orphan
Source code in maggma/stores/file_store.py
def read(self) -> List[Dict]:
"""
Iterate through all files in the Store folder and populate
the Store with dictionaries containing basic information about each file.
The keys of the documents added to the Store are
name: str = File name
path: Path = Absolute path of this file
parent: str = Name of the parent directory (if any)
file_id: str = Unique identifier for this file, computed from the hash
of its path relative to the base FileStore directory and
the file creation time. The key of this field is 'file_id'
by default but can be changed via the 'key' kwarg to
FileStore.__init__().
size: int = Size of this file in bytes
last_updated: datetime = Time this file was last modified
hash: str = Hash of the file contents
orphan: bool = Whether this record is an orphan
"""
file_list = []
# generate a list of files in subdirectories
for pattern in self.file_filters:
# list every file that matches the pattern
for f in self.path.rglob(pattern):
if f.is_file():
# ignore the .json file created by the Store
if f.name == self.json_name:
continue
# filter based on depth
depth = len(f.relative_to(self.path).parts) - 1
if self.max_depth is None or depth <= self.max_depth:
file_list.append(self._create_record_from_file(f))
return file_list
remove_docs(self, criteria, confirm=False)
¶
Remove items matching the query dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
confirm |
bool |
Boolean flag to confirm that remove_docs should delete files on disk. Default: False. |
False |
Source code in maggma/stores/file_store.py
def remove_docs(self, criteria: Dict, confirm: bool = False):
"""
Remove items matching the query dictionary.
Args:
criteria: query dictionary to match
confirm: Boolean flag to confirm that remove_docs should delete
files on disk. Default: False.
"""
if self.read_only:
raise StoreError(
"This Store is read-only. To enable file I/O, re-initialize the "
"store with read_only=False."
)
docs = [d for d in self.query(criteria)]
# this ensures that any modifications to criteria made by self.query
# (e.g., related to orphans or contents) are propagated through to the superclass
new_criteria = {"file_id": {"$in": [d["file_id"] for d in docs]}}
if len(docs) > 0 and not confirm:
raise StoreError(
f"Warning! This command is about to delete {len(docs)} items from disk! "
"If this is what you want, reissue this command with confirm=True."
)
for d in docs:
Path(d["path"]).unlink()
super().remove_docs(criteria=new_criteria)
update(self, docs, key=None)
¶
Update items in the Store. Only possible if the store is not read only. Any new fields that are added will be written to the JSON file in the root directory of the FileStore.
Note that certain fields that come from file metadata on disk are protected and cannot be updated with this method. This prevents the contents of the FileStore from becoming out of sync with the files on which it is based. The protected fields are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent', 'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are retained to make each document in the JSON file identifiable by manual inspection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
Source code in maggma/stores/file_store.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update items in the Store. Only possible if the store is not read only. Any new
fields that are added will be written to the JSON file in the root directory
of the FileStore.
Note that certain fields that come from file metadata on disk are protected and
cannot be updated with this method. This prevents the contents of the FileStore
from becoming out of sync with the files on which it is based. The protected fields
are keys in the dict returned by _create_record_from_file, e.g. 'name', 'parent',
'path', 'last_updated', 'hash', 'size', 'contents', and 'orphan'. The 'path_relative' and key fields are
retained to make each document in the JSON file identifiable by manual inspection.
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
if self.read_only:
raise StoreError(
"This Store is read-only. To enable file I/O, re-initialize the store with read_only=False."
)
super().update(docs, key)
data = [d for d in self.query()]
filtered_data = []
# remove fields that are populated by .read()
for d in data:
filtered_d = self._filter_data(d)
# don't write records that contain only file_id
if (
len(set(filtered_d.keys()).difference(set(["path_relative", self.key])))
!= 0
):
filtered_data.append(filtered_d)
self.metadata_store.update(filtered_data, self.key)
Module containing various definitions of Stores. Stores are a default access pattern to data and provide various utilities
GridFSStore (Store)
¶
A Store for GrdiFS backend. Provides a common access method consistent with other stores
Source code in maggma/stores/gridfs.py
class GridFSStore(Store):
"""
A Store for GrdiFS backend. Provides a common access method consistent with other stores
"""
def __init__(
self,
database: str,
collection_name: str,
host: str = "localhost",
port: int = 27017,
username: str = "",
password: str = "",
compression: bool = False,
ensure_metadata: bool = False,
searchable_fields: List[str] = None,
auth_source: Optional[str] = None,
mongoclient_kwargs: Optional[Dict] = None,
ssh_tunnel: Optional[SSHTunnel] = None,
**kwargs,
):
"""
Initializes a GrdiFS Store for binary data
Args:
database: database name
collection_name: The name of the collection.
This is the string portion before the GridFS extensions
host: hostname for the database
port: port to connect to
username: username to connect as
password: password to authenticate as
compression: compress the data as it goes into GridFS
ensure_metadata: ensure returned documents have the metadata fields
searchable_fields: fields to keep in the index store
auth_source: The database to authenticate on. Defaults to the database name.
ssh_tunnel: An SSHTunnel object to use.
"""
self.database = database
self.collection_name = collection_name
self.host = host
self.port = port
self.username = username
self.password = password
self._coll: Any = None
self.compression = compression
self.ensure_metadata = ensure_metadata
self.searchable_fields = [] if searchable_fields is None else searchable_fields
self.kwargs = kwargs
self.ssh_tunnel = ssh_tunnel
if auth_source is None:
auth_source = self.database
self.auth_source = auth_source
self.mongoclient_kwargs = mongoclient_kwargs or {}
if "key" not in kwargs:
kwargs["key"] = "_id"
super().__init__(**kwargs)
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
"""
Convenience method to construct a GridFSStore from a launchpad file
Note: A launchpad file is a special formatted yaml file used in fireworks
Returns:
"""
with open(lp_file, "r") as f:
lp_creds = yaml.safe_load(f.read())
db_creds = lp_creds.copy()
db_creds["database"] = db_creds["name"]
for key in list(db_creds.keys()):
if key not in ["database", "host", "port", "username", "password"]:
db_creds.pop(key)
db_creds["collection_name"] = collection_name
return cls(**db_creds, **kwargs)
@property
def name(self) -> str:
"""
Return a string representing this data source
"""
return f"gridfs://{self.host}/{self.database}/{self.collection_name}"
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self.ssh_tunnel is None:
host = self.host
port = self.port
else:
self.ssh_tunnel.start()
host, port = self.ssh_tunnel.local_address
conn: MongoClient = (
MongoClient(
host=host,
port=port,
username=self.username,
password=self.password,
authSource=self.auth_source,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(host, port, **self.mongoclient_kwargs)
)
if not self._coll or force_reset:
db = conn[self.database]
self._coll = gridfs.GridFS(db, self.collection_name)
self._files_collection = db["{}.files".format(self.collection_name)]
self._files_store = MongoStore.from_collection(self._files_collection)
self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
self._files_store.key = self.key
self._chunks_collection = db["{}.chunks".format(self.collection_name)]
@property
def _collection(self):
"""Property referring to underlying pymongo collection"""
if self._coll is None:
raise StoreError(
"Must connect Mongo-like store before attempting to use it"
)
return self._coll
@property
def last_updated(self) -> datetime:
"""
Provides the most recent last_updated date time stamp from
the documents in this Store
"""
return self._files_store.last_updated
@classmethod
def transform_criteria(cls, criteria: Dict) -> Dict:
"""
Allow client to not need to prepend 'metadata.' to query fields.
Args:
criteria: Query criteria
"""
new_criteria = dict()
for field in criteria:
if field not in files_collection_fields and not field.startswith(
"metadata."
):
new_criteria["metadata." + field] = copy.copy(criteria[field])
else:
new_criteria[field] = copy.copy(criteria[field])
return new_criteria
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
if isinstance(criteria, dict):
criteria = self.transform_criteria(criteria)
return self._files_store.count(criteria)
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the GridFS Store for a set of documents.
Will check to see if data can be returned from
files store first.
If the data from the gridfs is not a json serialized string
a dict will be returned with the data in the "data" key
plus the self.key and self.last_updated_field.
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
if isinstance(criteria, dict):
criteria = self.transform_criteria(criteria)
elif criteria is not None:
raise ValueError("Criteria must be a dictionary or None")
prop_keys = set()
if isinstance(properties, dict):
prop_keys = set(properties.keys())
elif isinstance(properties, list):
prop_keys = set(properties)
for doc in self._files_store.query(
criteria=criteria, sort=sort, limit=limit, skip=skip
):
if properties is not None and prop_keys.issubset(set(doc.keys())):
yield {p: doc[p] for p in properties if p in doc}
else:
metadata = doc.get("metadata", {})
data = self._collection.find_one(
filter={"_id": doc["_id"]},
skip=skip,
limit=limit,
sort=sort,
).read()
if metadata.get("compression", "") == "zlib":
data = zlib.decompress(data).decode("UTF-8")
try:
data = json.loads(data)
except Exception:
if not isinstance(data, dict):
data = {
"data": data,
self.key: doc.get(self.key),
self.last_updated_field: doc.get(self.last_updated_field),
}
if self.ensure_metadata and isinstance(data, dict):
data.update(metadata)
yield data
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field. This function only operates
on the metadata in the files collection
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = (
self.transform_criteria(criteria)
if isinstance(criteria, dict)
else criteria
)
field = (
f"metadata.{field}"
if field not in files_collection_fields
and not field.startswith("metadata.")
else field
)
return self._files_store.distinct(field=field, criteria=criteria)
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys. Will only work if the keys are included in the files
collection for GridFS
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
criteria = (
self.transform_criteria(criteria)
if isinstance(criteria, dict)
else criteria
)
keys = [keys] if not isinstance(keys, list) else keys
keys = [
f"metadata.{k}"
if k not in files_collection_fields and not k.startswith("metadata.")
else k
for k in keys
]
for group, ids in self._files_store.groupby(
keys, criteria=criteria, properties=[f"metadata.{self.key}"]
):
ids = [
get(doc, f"metadata.{self.key}")
for doc in ids
if has(doc, f"metadata.{self.key}")
]
group = {
k.replace("metadata.", ""): get(group, k) for k in keys if has(group, k)
}
yield group, list(self.query(criteria={self.key: {"$in": ids}}))
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
"""
Tries to create an index and return true if it succeeded
Currently operators on the GridFS files collection
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
# Transform key for gridfs first
if key not in files_collection_fields:
files_col_key = "metadata.{}".format(key)
return self._files_store.ensure_index(files_col_key, unique=unique)
else:
return self._files_store.ensure_index(key, unique=unique)
def update(
self,
docs: Union[List[Dict], Dict],
key: Union[List, str, None] = None,
additional_metadata: Union[str, List[str], None] = None,
):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
additional_metadata: field(s) to include in the gridfs metadata
"""
if not isinstance(docs, list):
docs = [docs]
if isinstance(key, str):
key = [key]
elif not key:
key = [self.key]
key = list(set(key) - set(files_collection_fields))
if additional_metadata is None:
additional_metadata = []
elif isinstance(additional_metadata, str):
additional_metadata = [additional_metadata]
else:
additional_metadata = list(additional_metadata)
for d in docs:
search_doc = {k: d[k] for k in key}
metadata = {
k: get(d, k)
for k in [self.last_updated_field]
+ additional_metadata
+ self.searchable_fields
if has(d, k)
}
metadata.update(search_doc)
data = json.dumps(jsanitize(d)).encode("UTF-8")
if self.compression:
data = zlib.compress(data)
metadata["compression"] = "zlib"
self._collection.put(data, metadata=metadata)
search_doc = self.transform_criteria(search_doc)
# Cleans up old gridfs entries
for fdoc in (
self._files_collection.find(search_doc, ["_id"])
.sort("uploadDate", -1)
.skip(1)
):
self._collection.delete(fdoc["_id"])
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
if isinstance(criteria, dict):
criteria = self.transform_criteria(criteria)
ids = [cursor._id for cursor in self._collection.find(criteria)]
for _id in ids:
self._collection.delete(_id)
def close(self):
self._files_store.close()
self._coll = None
if self.ssh_tunnel is not None:
self.ssh_tunnel.stop()
def __eq__(self, other: object) -> bool:
"""
Check equality for GridFSStore
other: other GridFSStore to compare with
"""
if not isinstance(other, GridFSStore):
return False
fields = ["database", "collection_name", "host", "port"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
last_updated: datetime
property
readonly
¶
Provides the most recent last_updated date time stamp from the documents in this Store
name: str
property
readonly
¶
Return a string representing this data source
__eq__(self, other)
special
¶
Check equality for GridFSStore other: other GridFSStore to compare with
Source code in maggma/stores/gridfs.py
def __eq__(self, other: object) -> bool:
"""
Check equality for GridFSStore
other: other GridFSStore to compare with
"""
if not isinstance(other, GridFSStore):
return False
fields = ["database", "collection_name", "host", "port"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, database, collection_name, host='localhost', port=27017, username='', password='', compression=False, ensure_metadata=False, searchable_fields=None, auth_source=None, mongoclient_kwargs=None, ssh_tunnel=None, **kwargs)
special
¶
Initializes a GrdiFS Store for binary data
Parameters:
Name | Type | Description | Default |
---|---|---|---|
database |
str |
database name |
required |
collection_name |
str |
The name of the collection. This is the string portion before the GridFS extensions |
required |
host |
str |
hostname for the database |
'localhost' |
port |
int |
port to connect to |
27017 |
username |
str |
username to connect as |
'' |
password |
str |
password to authenticate as |
'' |
compression |
bool |
compress the data as it goes into GridFS |
False |
ensure_metadata |
bool |
ensure returned documents have the metadata fields |
False |
searchable_fields |
List[str] |
fields to keep in the index store |
None |
auth_source |
Optional[str] |
The database to authenticate on. Defaults to the database name. |
None |
ssh_tunnel |
Optional[maggma.stores.mongolike.SSHTunnel] |
An SSHTunnel object to use. |
None |
Source code in maggma/stores/gridfs.py
def __init__(
self,
database: str,
collection_name: str,
host: str = "localhost",
port: int = 27017,
username: str = "",
password: str = "",
compression: bool = False,
ensure_metadata: bool = False,
searchable_fields: List[str] = None,
auth_source: Optional[str] = None,
mongoclient_kwargs: Optional[Dict] = None,
ssh_tunnel: Optional[SSHTunnel] = None,
**kwargs,
):
"""
Initializes a GrdiFS Store for binary data
Args:
database: database name
collection_name: The name of the collection.
This is the string portion before the GridFS extensions
host: hostname for the database
port: port to connect to
username: username to connect as
password: password to authenticate as
compression: compress the data as it goes into GridFS
ensure_metadata: ensure returned documents have the metadata fields
searchable_fields: fields to keep in the index store
auth_source: The database to authenticate on. Defaults to the database name.
ssh_tunnel: An SSHTunnel object to use.
"""
self.database = database
self.collection_name = collection_name
self.host = host
self.port = port
self.username = username
self.password = password
self._coll: Any = None
self.compression = compression
self.ensure_metadata = ensure_metadata
self.searchable_fields = [] if searchable_fields is None else searchable_fields
self.kwargs = kwargs
self.ssh_tunnel = ssh_tunnel
if auth_source is None:
auth_source = self.database
self.auth_source = auth_source
self.mongoclient_kwargs = mongoclient_kwargs or {}
if "key" not in kwargs:
kwargs["key"] = "_id"
super().__init__(**kwargs)
close(self)
¶
Closes any connections
Source code in maggma/stores/gridfs.py
def close(self):
self._files_store.close()
self._coll = None
if self.ssh_tunnel is not None:
self.ssh_tunnel.stop()
connect(self, force_reset=False)
¶
Connect to the source data
Source code in maggma/stores/gridfs.py
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if self.ssh_tunnel is None:
host = self.host
port = self.port
else:
self.ssh_tunnel.start()
host, port = self.ssh_tunnel.local_address
conn: MongoClient = (
MongoClient(
host=host,
port=port,
username=self.username,
password=self.password,
authSource=self.auth_source,
**self.mongoclient_kwargs,
)
if self.username != ""
else MongoClient(host, port, **self.mongoclient_kwargs)
)
if not self._coll or force_reset:
db = conn[self.database]
self._coll = gridfs.GridFS(db, self.collection_name)
self._files_collection = db["{}.files".format(self.collection_name)]
self._files_store = MongoStore.from_collection(self._files_collection)
self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
self._files_store.key = self.key
self._chunks_collection = db["{}.chunks".format(self.collection_name)]
count(self, criteria=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
Source code in maggma/stores/gridfs.py
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
if isinstance(criteria, dict):
criteria = self.transform_criteria(criteria)
return self._files_store.count(criteria)
distinct(self, field, criteria=None, all_exist=False)
¶
Get all distinct values for a field. This function only operates on the metadata in the files collection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field |
str |
the field(s) to get distinct values for |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
Source code in maggma/stores/gridfs.py
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field. This function only operates
on the metadata in the files collection
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = (
self.transform_criteria(criteria)
if isinstance(criteria, dict)
else criteria
)
field = (
f"metadata.{field}"
if field not in files_collection_fields
and not field.startswith("metadata.")
else field
)
return self._files_store.distinct(field=field, criteria=criteria)
ensure_index(self, key, unique=False)
¶
Tries to create an index and return true if it succeeded Currently operators on the GridFS files collection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
single key to index |
required |
unique |
Optional[bool] |
Whether or not this index contains only unique keys |
False |
Returns:
Type | Description |
---|---|
bool |
bool indicating if the index exists/was created |
Source code in maggma/stores/gridfs.py
def ensure_index(self, key: str, unique: Optional[bool] = False) -> bool:
"""
Tries to create an index and return true if it succeeded
Currently operators on the GridFS files collection
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
# Transform key for gridfs first
if key not in files_collection_fields:
files_col_key = "metadata.{}".format(key)
return self._files_store.ensure_index(files_col_key, unique=unique)
else:
return self._files_store.ensure_index(key, unique=unique)
from_launchpad_file(lp_file, collection_name, **kwargs)
classmethod
¶
Convenience method to construct a GridFSStore from a launchpad file
Note: A launchpad file is a special formatted yaml file used in fireworks
Source code in maggma/stores/gridfs.py
@classmethod
def from_launchpad_file(cls, lp_file, collection_name, **kwargs):
"""
Convenience method to construct a GridFSStore from a launchpad file
Note: A launchpad file is a special formatted yaml file used in fireworks
Returns:
"""
with open(lp_file, "r") as f:
lp_creds = yaml.safe_load(f.read())
db_creds = lp_creds.copy()
db_creds["database"] = db_creds["name"]
for key in list(db_creds.keys()):
if key not in ["database", "host", "port", "username", "password"]:
db_creds.pop(key)
db_creds["collection_name"] = collection_name
return cls(**db_creds, **kwargs)
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys. Will only work if the keys are included in the files collection for GridFS
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (dict, list of docs) |
Source code in maggma/stores/gridfs.py
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys. Will only work if the keys are included in the files
collection for GridFS
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
criteria = (
self.transform_criteria(criteria)
if isinstance(criteria, dict)
else criteria
)
keys = [keys] if not isinstance(keys, list) else keys
keys = [
f"metadata.{k}"
if k not in files_collection_fields and not k.startswith("metadata.")
else k
for k in keys
]
for group, ids in self._files_store.groupby(
keys, criteria=criteria, properties=[f"metadata.{self.key}"]
):
ids = [
get(doc, f"metadata.{self.key}")
for doc in ids
if has(doc, f"metadata.{self.key}")
]
group = {
k.replace("metadata.", ""): get(group, k) for k in keys if has(group, k)
}
yield group, list(self.query(criteria={self.key: {"$in": ids}}))
query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Queries the GridFS Store for a set of documents. Will check to see if data can be returned from files store first. If the data from the gridfs is not a json serialized string a dict will be returned with the data in the "data" key plus the self.key and self.last_updated_field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Source code in maggma/stores/gridfs.py
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the GridFS Store for a set of documents.
Will check to see if data can be returned from
files store first.
If the data from the gridfs is not a json serialized string
a dict will be returned with the data in the "data" key
plus the self.key and self.last_updated_field.
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
if isinstance(criteria, dict):
criteria = self.transform_criteria(criteria)
elif criteria is not None:
raise ValueError("Criteria must be a dictionary or None")
prop_keys = set()
if isinstance(properties, dict):
prop_keys = set(properties.keys())
elif isinstance(properties, list):
prop_keys = set(properties)
for doc in self._files_store.query(
criteria=criteria, sort=sort, limit=limit, skip=skip
):
if properties is not None and prop_keys.issubset(set(doc.keys())):
yield {p: doc[p] for p in properties if p in doc}
else:
metadata = doc.get("metadata", {})
data = self._collection.find_one(
filter={"_id": doc["_id"]},
skip=skip,
limit=limit,
sort=sort,
).read()
if metadata.get("compression", "") == "zlib":
data = zlib.decompress(data).decode("UTF-8")
try:
data = json.loads(data)
except Exception:
if not isinstance(data, dict):
data = {
"data": data,
self.key: doc.get(self.key),
self.last_updated_field: doc.get(self.last_updated_field),
}
if self.ensure_metadata and isinstance(data, dict):
data.update(metadata)
yield data
remove_docs(self, criteria)
¶
Remove docs matching the query dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
Source code in maggma/stores/gridfs.py
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
if isinstance(criteria, dict):
criteria = self.transform_criteria(criteria)
ids = [cursor._id for cursor in self._collection.find(criteria)]
for _id in ids:
self._collection.delete(_id)
transform_criteria(criteria)
classmethod
¶
Allow client to not need to prepend 'metadata.' to query fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
Query criteria |
required |
Source code in maggma/stores/gridfs.py
@classmethod
def transform_criteria(cls, criteria: Dict) -> Dict:
"""
Allow client to not need to prepend 'metadata.' to query fields.
Args:
criteria: Query criteria
"""
new_criteria = dict()
for field in criteria:
if field not in files_collection_fields and not field.startswith(
"metadata."
):
new_criteria["metadata." + field] = copy.copy(criteria[field])
else:
new_criteria[field] = copy.copy(criteria[field])
return new_criteria
update(self, docs, key=None, additional_metadata=None)
¶
Update documents into the Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
additional_metadata |
Union[str, List[str]] |
field(s) to include in the gridfs metadata |
None |
Source code in maggma/stores/gridfs.py
def update(
self,
docs: Union[List[Dict], Dict],
key: Union[List, str, None] = None,
additional_metadata: Union[str, List[str], None] = None,
):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
additional_metadata: field(s) to include in the gridfs metadata
"""
if not isinstance(docs, list):
docs = [docs]
if isinstance(key, str):
key = [key]
elif not key:
key = [self.key]
key = list(set(key) - set(files_collection_fields))
if additional_metadata is None:
additional_metadata = []
elif isinstance(additional_metadata, str):
additional_metadata = [additional_metadata]
else:
additional_metadata = list(additional_metadata)
for d in docs:
search_doc = {k: d[k] for k in key}
metadata = {
k: get(d, k)
for k in [self.last_updated_field]
+ additional_metadata
+ self.searchable_fields
if has(d, k)
}
metadata.update(search_doc)
data = json.dumps(jsanitize(d)).encode("UTF-8")
if self.compression:
data = zlib.compress(data)
metadata["compression"] = "zlib"
self._collection.put(data, metadata=metadata)
search_doc = self.transform_criteria(search_doc)
# Cleans up old gridfs entries
for fdoc in (
self._files_collection.find(search_doc, ["_id"])
.sort("uploadDate", -1)
.skip(1)
):
self._collection.delete(fdoc["_id"])
GridFSURIStore (GridFSStore)
¶
A Store for GridFS backend, with connection via a mongo URI string.
This is expected to be a special mongodb+srv:// URIs that include client parameters via TXT records
Source code in maggma/stores/gridfs.py
class GridFSURIStore(GridFSStore):
"""
A Store for GridFS backend, with connection via a mongo URI string.
This is expected to be a special mongodb+srv:// URIs that include client parameters
via TXT records
"""
def __init__(
self,
uri: str,
collection_name: str,
database: str = None,
compression: bool = False,
ensure_metadata: bool = False,
searchable_fields: List[str] = None,
mongoclient_kwargs: Optional[Dict] = None,
**kwargs,
):
"""
Initializes a GrdiFS Store for binary data
Args:
uri: MongoDB+SRV URI
database: database to connect to
collection_name: The collection name
compression: compress the data as it goes into GridFS
ensure_metadata: ensure returned documents have the metadata fields
searchable_fields: fields to keep in the index store
"""
self.uri = uri
# parse the dbname from the uri
if database is None:
d_uri = uri_parser.parse_uri(uri)
if d_uri["database"] is None:
raise ConfigurationError(
"If database name is not supplied, a database must be set in the uri"
)
self.database = d_uri["database"]
else:
self.database = database
self.collection_name = collection_name
self._coll: Any = None
self.compression = compression
self.ensure_metadata = ensure_metadata
self.searchable_fields = [] if searchable_fields is None else searchable_fields
self.kwargs = kwargs
self.mongoclient_kwargs = mongoclient_kwargs or {}
if "key" not in kwargs:
kwargs["key"] = "_id"
super(GridFSStore, self).__init__(**kwargs) # lgtm
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if not self._coll or force_reset: # pragma: no cover
conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
db = conn[self.database]
self._coll = gridfs.GridFS(db, self.collection_name)
self._files_collection = db["{}.files".format(self.collection_name)]
self._files_store = MongoStore.from_collection(self._files_collection)
self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
self._files_store.key = self.key
self._chunks_collection = db["{}.chunks".format(self.collection_name)]
__init__(self, uri, collection_name, database=None, compression=False, ensure_metadata=False, searchable_fields=None, mongoclient_kwargs=None, **kwargs)
special
¶
Initializes a GrdiFS Store for binary data
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
str |
MongoDB+SRV URI |
required |
database |
str |
database to connect to |
None |
collection_name |
str |
The collection name |
required |
compression |
bool |
compress the data as it goes into GridFS |
False |
ensure_metadata |
bool |
ensure returned documents have the metadata fields |
False |
searchable_fields |
List[str] |
fields to keep in the index store |
None |
Source code in maggma/stores/gridfs.py
def __init__(
self,
uri: str,
collection_name: str,
database: str = None,
compression: bool = False,
ensure_metadata: bool = False,
searchable_fields: List[str] = None,
mongoclient_kwargs: Optional[Dict] = None,
**kwargs,
):
"""
Initializes a GrdiFS Store for binary data
Args:
uri: MongoDB+SRV URI
database: database to connect to
collection_name: The collection name
compression: compress the data as it goes into GridFS
ensure_metadata: ensure returned documents have the metadata fields
searchable_fields: fields to keep in the index store
"""
self.uri = uri
# parse the dbname from the uri
if database is None:
d_uri = uri_parser.parse_uri(uri)
if d_uri["database"] is None:
raise ConfigurationError(
"If database name is not supplied, a database must be set in the uri"
)
self.database = d_uri["database"]
else:
self.database = database
self.collection_name = collection_name
self._coll: Any = None
self.compression = compression
self.ensure_metadata = ensure_metadata
self.searchable_fields = [] if searchable_fields is None else searchable_fields
self.kwargs = kwargs
self.mongoclient_kwargs = mongoclient_kwargs or {}
if "key" not in kwargs:
kwargs["key"] = "_id"
super(GridFSStore, self).__init__(**kwargs) # lgtm
connect(self, force_reset=False)
¶
Connect to the source data
Source code in maggma/stores/gridfs.py
def connect(self, force_reset: bool = False):
"""
Connect to the source data
"""
if not self._coll or force_reset: # pragma: no cover
conn: MongoClient = MongoClient(self.uri, **self.mongoclient_kwargs)
db = conn[self.database]
self._coll = gridfs.GridFS(db, self.collection_name)
self._files_collection = db["{}.files".format(self.collection_name)]
self._files_store = MongoStore.from_collection(self._files_collection)
self._files_store.last_updated_field = f"metadata.{self.last_updated_field}"
self._files_store.key = self.key
self._chunks_collection = db["{}.chunks".format(self.collection_name)]
Advanced Stores for connecting to AWS data
S3Store (Store)
¶
GridFS like storage using Amazon S3 and a regular store for indexing Assumes Amazon AWS key and secret key are set in environment or default config file
Source code in maggma/stores/aws.py
class S3Store(Store):
"""
GridFS like storage using Amazon S3 and a regular store for indexing
Assumes Amazon AWS key and secret key are set in environment or default config file
"""
def __init__(
self,
index: Store,
bucket: str,
s3_profile: Optional[Union[str, dict]] = None,
compress: bool = False,
endpoint_url: str = None,
sub_dir: str = None,
s3_workers: int = 1,
s3_resource_kwargs: Optional[dict] = None,
key: str = "fs_id",
store_hash: bool = True,
unpack_data: bool = True,
searchable_fields: Optional[List[str]] = None,
**kwargs,
):
"""
Initializes an S3 Store
Args:
index: a store to use to index the S3 Bucket
bucket: name of the bucket
s3_profile: name of aws profile containing credentials for role.
Alternatively you can pass in a dictionary with the full credentials:
aws_access_key_id (string) -- AWS access key ID
aws_secret_access_key (string) -- AWS secret access key
aws_session_token (string) -- AWS temporary session token
region_name (string) -- Default region when creating new connections
compress: compress files inserted into the store
endpoint_url: endpoint_url to allow interface to minio service
sub_dir: (optional) subdirectory of the s3 bucket to store the data
s3_workers: number of concurrent S3 puts to run
store_hash: store the sha1 hash right before insertion to the database.
unpack_data: whether to decompress and unpack byte data when querying from the bucket.
searchable_fields: fields to keep in the index store
"""
if boto3 is None:
raise RuntimeError("boto3 and botocore are required for S3Store")
self.index = index
self.bucket = bucket
self.s3_profile = s3_profile
self.compress = compress
self.endpoint_url = endpoint_url
self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
self.s3 = None # type: Any
self.s3_bucket = None # type: Any
self.s3_workers = s3_workers
self.s3_resource_kwargs = (
s3_resource_kwargs if s3_resource_kwargs is not None else {}
)
self.unpack_data = unpack_data
self.searchable_fields = (
searchable_fields if searchable_fields is not None else []
)
self.store_hash = store_hash
# Force the key to be the same as the index
assert isinstance(
index.key, str
), "Since we are using the key as a file name in S3, they key must be a string"
if key != index.key:
warnings.warn(
f'The desired S3Store key "{key}" does not match the index key "{index.key},"'
"the index key will be used",
UserWarning,
)
kwargs["key"] = str(index.key)
self._thread_local = threading.local()
super(S3Store, self).__init__(**kwargs)
@property
def name(self) -> str:
"""
Returns:
a string representing this data source
"""
return f"s3://{self.bucket}"
def connect(self, *args, **kwargs): # lgtm[py/conflicting-attributes]
"""
Connect to the source data
"""
session = self._get_session()
resource = session.resource(
"s3", endpoint_url=self.endpoint_url, **self.s3_resource_kwargs
)
if not self.s3:
self.s3 = resource
try:
self.s3.meta.client.head_bucket(Bucket=self.bucket)
except ClientError:
raise RuntimeError("Bucket not present on AWS: {}".format(self.bucket))
self.s3_bucket = resource.Bucket(self.bucket)
self.index.connect(*args, **kwargs)
def close(self):
"""
Closes any connections
"""
self.index.close()
self.s3 = None
self.s3_bucket = None
@property
def _collection(self):
"""
Returns:
a handle to the pymongo collection object
Important:
Not guaranteed to exist in the future
"""
# For now returns the index collection since that is what we would "search" on
return self.index._collection
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
return self.index.count(criteria)
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
prop_keys = set()
if isinstance(properties, dict):
prop_keys = set(properties.keys())
elif isinstance(properties, list):
prop_keys = set(properties)
for doc in self.index.query(
criteria=criteria, sort=sort, limit=limit, skip=skip
):
if properties is not None and prop_keys.issubset(set(doc.keys())):
yield {p: doc[p] for p in properties if p in doc}
else:
try:
# TODO: THis is ugly and unsafe, do some real checking before pulling data
data = (
self.s3_bucket.Object(self.sub_dir + str(doc[self.key]))
.get()["Body"]
.read()
)
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
self.logger.error(
"Could not find S3 object {}".format(doc[self.key])
)
break
else:
raise e
if self.unpack_data:
data = self._unpack(
data=data, compressed=doc.get("compression", "") == "zlib"
)
if self.last_updated_field in doc:
data[self.last_updated_field] = doc[self.last_updated_field]
yield data
@staticmethod
def _unpack(data: bytes, compressed: bool):
if compressed:
data = zlib.decompress(data)
# requires msgpack-python to be installed to fix string encoding problem
# https://github.com/msgpack/msgpack/issues/121
# During recursion
# msgpack.unpackb goes as deep as possible during reconstruction
# MontyDecoder().process_decode only goes until it finds a from_dict
# as such, we cannot just use msgpack.unpackb(data, object_hook=monty_object_hook, raw=False)
# Should just return the unpacked object then let the user run process_decoded
unpacked_data = msgpack.unpackb(data, raw=False)
return unpacked_data
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
# Index is a store so it should have its own distinct function
return self.index.distinct(field, criteria=criteria)
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
return self.index.groupby(
keys=keys,
criteria=criteria,
properties=properties,
sort=sort,
skip=skip,
limit=limit,
)
def ensure_index(self, key: str, unique: bool = False) -> bool:
"""
Tries to create an index and return true if it suceeded
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
return self.index.ensure_index(key, unique=unique)
def update(
self,
docs: Union[List[Dict], Dict],
key: Union[List, str, None] = None,
additional_metadata: Union[str, List[str], None] = None,
):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
additional_metadata: field(s) to include in the s3 store's metadata
"""
if not isinstance(docs, list):
docs = [docs]
if isinstance(key, str):
key = [key]
elif not key:
key = [self.key]
if additional_metadata is None:
additional_metadata = []
elif isinstance(additional_metadata, str):
additional_metadata = [additional_metadata]
else:
additional_metadata = list(additional_metadata)
with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
fs = {
pool.submit(
self.write_doc_to_s3,
doc=itr_doc,
search_keys=key + additional_metadata + self.searchable_fields,
)
for itr_doc in docs
}
fs, _ = wait(fs)
search_docs = [sdoc.result() for sdoc in fs]
# Use store's update to remove key clashes
self.index.update(search_docs, key=self.key)
def _get_session(self):
if not hasattr(self._thread_local, "s3_bucket"):
if isinstance(self.s3_profile, dict):
return Session(**self.s3_profile)
else:
return Session(profile_name=self.s3_profile)
def _get_bucket(self):
"""
If on the main thread return the bucket created above, else create a new bucket on each thread
"""
if threading.current_thread().name == "MainThread":
return self.s3_bucket
if not hasattr(self._thread_local, "s3_bucket"):
session = self._get_session()
resource = session.resource("s3", endpoint_url=self.endpoint_url)
self._thread_local.s3_bucket = resource.Bucket(self.bucket)
return self._thread_local.s3_bucket
def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
"""
Write the data to s3 and return the metadata to be inserted into the index db
Args:
doc: the document
search_keys: list of keys to pull from the docs and be inserted into the
index db
"""
s3_bucket = self._get_bucket()
search_doc = {k: doc[k] for k in search_keys}
search_doc[self.key] = doc[self.key] # Ensure key is in metadata
if self.sub_dir != "":
search_doc["sub_dir"] = self.sub_dir
# Remove MongoDB _id from search
if "_id" in search_doc:
del search_doc["_id"]
# to make hashing more meaningful, make sure last updated field is removed
lu_info = doc.pop(self.last_updated_field, None)
data = msgpack.packb(doc, default=monty_default)
if self.compress:
# Compress with zlib if chosen
search_doc["compression"] = "zlib"
data = zlib.compress(data)
if self.last_updated_field in doc:
# need this conversion for aws metadata insert
search_doc[self.last_updated_field] = str(
to_isoformat_ceil_ms(doc[self.last_updated_field])
)
# keep a record of original keys, in case these are important for the individual researcher
# it is not expected that this information will be used except in disaster recovery
s3_to_mongo_keys = {k: self._sanitize_key(k) for k in search_doc.keys()}
s3_to_mongo_keys["s3-to-mongo-keys"] = "s3-to-mongo-keys" # inception
# encode dictionary since values have to be strings
search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
s3_bucket.put_object(
Key=self.sub_dir + str(doc[self.key]),
Body=data,
Metadata={s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()},
)
if lu_info is not None:
search_doc[self.last_updated_field] = lu_info
if self.store_hash:
hasher = sha1()
hasher.update(data)
obj_hash = hasher.hexdigest()
search_doc["obj_hash"] = obj_hash
return search_doc
@staticmethod
def _sanitize_key(key):
"""
Sanitize keys to store in S3/MinIO metadata.
"""
# Any underscores are encoded as double dashes in metadata, since keys with
# underscores may be result in the corresponding HTTP header being stripped
# by certain server configurations (e.g. default nginx), leading to:
# `botocore.exceptions.ClientError: An error occurred (AccessDenied) when
# calling the PutObject operation: There were headers present in the request
# which were not signed`
# Metadata stored in the MongoDB index (self.index) is stored unchanged.
# Additionally, MinIO requires lowercase keys
return str(key).replace("_", "-").lower()
def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
remove_s3_object: whether to remove the actual S3 Object or not
"""
if not remove_s3_object:
self.index.remove_docs(criteria=criteria)
else:
to_remove = self.index.distinct(self.key, criteria=criteria)
self.index.remove_docs(criteria=criteria)
# Can remove up to 1000 items at a time via boto
to_remove_chunks = list(grouper(to_remove, n=1000))
for chunk_to_remove in to_remove_chunks:
objlist = [{"Key": f"{self.sub_dir}{obj}"} for obj in chunk_to_remove]
self.s3_bucket.delete_objects(Delete={"Objects": objlist})
@property
def last_updated(self):
return self.index.last_updated
def newer_in(
self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False
) -> List[str]:
"""
Returns the keys of documents that are newer in the target
Store than this Store.
Args:
target: target Store
criteria: PyMongo filter for documents to search in
exhaustive: triggers an item-by-item check vs. checking
the last_updated of the target Store and using
that to filter out new items in
"""
if hasattr(target, "index"):
return self.index.newer_in(
target=target.index, criteria=criteria, exhaustive=exhaustive
)
else:
return self.index.newer_in(
target=target, criteria=criteria, exhaustive=exhaustive
)
def __hash__(self):
return hash((self.index.__hash__, self.bucket))
def rebuild_index_from_s3_data(self, **kwargs):
"""
Rebuilds the index Store from the data in S3
Relies on the index document being stores as the metadata for the file
This can help recover lost databases
"""
bucket = self.s3_bucket
objects = bucket.objects.filter(Prefix=self.sub_dir)
for obj in objects:
key_ = self.sub_dir + obj.key
data = self.s3_bucket.Object(key_).get()["Body"].read()
if self.compress:
data = zlib.decompress(data)
unpacked_data = msgpack.unpackb(data, raw=False)
self.update(unpacked_data, **kwargs)
def rebuild_metadata_from_index(self, index_query: dict = None):
"""
Read data from the index store and populate the metadata of the S3 bucket
Force all of the keys to be lower case to be Minio compatible
Args:
index_query: query on the index store
"""
qq = {} if index_query is None else index_query
for index_doc in self.index.query(qq):
key_ = self.sub_dir + index_doc[self.key]
s3_object = self.s3_bucket.Object(key_)
new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
for k, v in index_doc.items():
new_meta[str(k).lower()] = v
new_meta.pop("_id")
if self.last_updated_field in new_meta:
new_meta[self.last_updated_field] = str(
to_isoformat_ceil_ms(new_meta[self.last_updated_field])
)
# s3_object.metadata.update(new_meta)
s3_object.copy_from(
CopySource={"Bucket": self.s3_bucket.name, "Key": key_},
Metadata=new_meta,
MetadataDirective="REPLACE",
)
def __eq__(self, other: object) -> bool:
"""
Check equality for S3Store
other: other S3Store to compare with
"""
if not isinstance(other, S3Store):
return False
fields = ["index", "bucket", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
last_updated
property
readonly
¶
Provides the most recent last_updated date time stamp from the documents in this Store
name: str
property
readonly
¶
Returns:
Type | Description |
---|---|
str |
a string representing this data source |
__eq__(self, other)
special
¶
Check equality for S3Store other: other S3Store to compare with
Source code in maggma/stores/aws.py
def __eq__(self, other: object) -> bool:
"""
Check equality for S3Store
other: other S3Store to compare with
"""
if not isinstance(other, S3Store):
return False
fields = ["index", "bucket", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, index, bucket, s3_profile=None, compress=False, endpoint_url=None, sub_dir=None, s3_workers=1, s3_resource_kwargs=None, key='fs_id', store_hash=True, unpack_data=True, searchable_fields=None, **kwargs)
special
¶
Initializes an S3 Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index |
Store |
a store to use to index the S3 Bucket |
required |
bucket |
str |
name of the bucket |
required |
s3_profile |
Union[str, dict] |
name of aws profile containing credentials for role. Alternatively you can pass in a dictionary with the full credentials: aws_access_key_id (string) -- AWS access key ID aws_secret_access_key (string) -- AWS secret access key aws_session_token (string) -- AWS temporary session token region_name (string) -- Default region when creating new connections |
None |
compress |
bool |
compress files inserted into the store |
False |
endpoint_url |
str |
endpoint_url to allow interface to minio service |
None |
sub_dir |
str |
(optional) subdirectory of the s3 bucket to store the data |
None |
s3_workers |
int |
number of concurrent S3 puts to run |
1 |
store_hash |
bool |
store the sha1 hash right before insertion to the database. |
True |
unpack_data |
bool |
whether to decompress and unpack byte data when querying from the bucket. |
True |
searchable_fields |
Optional[List[str]] |
fields to keep in the index store |
None |
Source code in maggma/stores/aws.py
def __init__(
self,
index: Store,
bucket: str,
s3_profile: Optional[Union[str, dict]] = None,
compress: bool = False,
endpoint_url: str = None,
sub_dir: str = None,
s3_workers: int = 1,
s3_resource_kwargs: Optional[dict] = None,
key: str = "fs_id",
store_hash: bool = True,
unpack_data: bool = True,
searchable_fields: Optional[List[str]] = None,
**kwargs,
):
"""
Initializes an S3 Store
Args:
index: a store to use to index the S3 Bucket
bucket: name of the bucket
s3_profile: name of aws profile containing credentials for role.
Alternatively you can pass in a dictionary with the full credentials:
aws_access_key_id (string) -- AWS access key ID
aws_secret_access_key (string) -- AWS secret access key
aws_session_token (string) -- AWS temporary session token
region_name (string) -- Default region when creating new connections
compress: compress files inserted into the store
endpoint_url: endpoint_url to allow interface to minio service
sub_dir: (optional) subdirectory of the s3 bucket to store the data
s3_workers: number of concurrent S3 puts to run
store_hash: store the sha1 hash right before insertion to the database.
unpack_data: whether to decompress and unpack byte data when querying from the bucket.
searchable_fields: fields to keep in the index store
"""
if boto3 is None:
raise RuntimeError("boto3 and botocore are required for S3Store")
self.index = index
self.bucket = bucket
self.s3_profile = s3_profile
self.compress = compress
self.endpoint_url = endpoint_url
self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
self.s3 = None # type: Any
self.s3_bucket = None # type: Any
self.s3_workers = s3_workers
self.s3_resource_kwargs = (
s3_resource_kwargs if s3_resource_kwargs is not None else {}
)
self.unpack_data = unpack_data
self.searchable_fields = (
searchable_fields if searchable_fields is not None else []
)
self.store_hash = store_hash
# Force the key to be the same as the index
assert isinstance(
index.key, str
), "Since we are using the key as a file name in S3, they key must be a string"
if key != index.key:
warnings.warn(
f'The desired S3Store key "{key}" does not match the index key "{index.key},"'
"the index key will be used",
UserWarning,
)
kwargs["key"] = str(index.key)
self._thread_local = threading.local()
super(S3Store, self).__init__(**kwargs)
close(self)
¶
Closes any connections
Source code in maggma/stores/aws.py
def close(self):
"""
Closes any connections
"""
self.index.close()
self.s3 = None
self.s3_bucket = None
connect(self, *args, **kwargs)
¶
Connect to the source data
Source code in maggma/stores/aws.py
def connect(self, *args, **kwargs): # lgtm[py/conflicting-attributes]
"""
Connect to the source data
"""
session = self._get_session()
resource = session.resource(
"s3", endpoint_url=self.endpoint_url, **self.s3_resource_kwargs
)
if not self.s3:
self.s3 = resource
try:
self.s3.meta.client.head_bucket(Bucket=self.bucket)
except ClientError:
raise RuntimeError("Bucket not present on AWS: {}".format(self.bucket))
self.s3_bucket = resource.Bucket(self.bucket)
self.index.connect(*args, **kwargs)
count(self, criteria=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
Source code in maggma/stores/aws.py
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
return self.index.count(criteria)
distinct(self, field, criteria=None, all_exist=False)
¶
Get all distinct values for a field
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field |
str |
the field(s) to get distinct values for |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
Source code in maggma/stores/aws.py
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
# Index is a store so it should have its own distinct function
return self.index.distinct(field, criteria=criteria)
ensure_index(self, key, unique=False)
¶
Tries to create an index and return true if it suceeded
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
single key to index |
required |
unique |
bool |
Whether or not this index contains only unique keys |
False |
Returns:
Type | Description |
---|---|
bool |
bool indicating if the index exists/was created |
Source code in maggma/stores/aws.py
def ensure_index(self, key: str, unique: bool = False) -> bool:
"""
Tries to create an index and return true if it suceeded
Args:
key: single key to index
unique: Whether or not this index contains only unique keys
Returns:
bool indicating if the index exists/was created
"""
return self.index.ensure_index(key, unique=unique)
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (dict, list of docs) |
Source code in maggma/stores/aws.py
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
return self.index.groupby(
keys=keys,
criteria=criteria,
properties=properties,
sort=sort,
skip=skip,
limit=limit,
)
newer_in(self, target, criteria=None, exhaustive=False)
¶
Returns the keys of documents that are newer in the target Store than this Store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target |
Store |
target Store |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
exhaustive |
bool |
triggers an item-by-item check vs. checking the last_updated of the target Store and using that to filter out new items in |
False |
Source code in maggma/stores/aws.py
def newer_in(
self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False
) -> List[str]:
"""
Returns the keys of documents that are newer in the target
Store than this Store.
Args:
target: target Store
criteria: PyMongo filter for documents to search in
exhaustive: triggers an item-by-item check vs. checking
the last_updated of the target Store and using
that to filter out new items in
"""
if hasattr(target, "index"):
return self.index.newer_in(
target=target.index, criteria=criteria, exhaustive=exhaustive
)
else:
return self.index.newer_in(
target=target, criteria=criteria, exhaustive=exhaustive
)
query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Queries the Store for a set of documents
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Source code in maggma/stores/aws.py
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
prop_keys = set()
if isinstance(properties, dict):
prop_keys = set(properties.keys())
elif isinstance(properties, list):
prop_keys = set(properties)
for doc in self.index.query(
criteria=criteria, sort=sort, limit=limit, skip=skip
):
if properties is not None and prop_keys.issubset(set(doc.keys())):
yield {p: doc[p] for p in properties if p in doc}
else:
try:
# TODO: THis is ugly and unsafe, do some real checking before pulling data
data = (
self.s3_bucket.Object(self.sub_dir + str(doc[self.key]))
.get()["Body"]
.read()
)
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
self.logger.error(
"Could not find S3 object {}".format(doc[self.key])
)
break
else:
raise e
if self.unpack_data:
data = self._unpack(
data=data, compressed=doc.get("compression", "") == "zlib"
)
if self.last_updated_field in doc:
data[self.last_updated_field] = doc[self.last_updated_field]
yield data
rebuild_index_from_s3_data(self, **kwargs)
¶
Rebuilds the index Store from the data in S3 Relies on the index document being stores as the metadata for the file This can help recover lost databases
Source code in maggma/stores/aws.py
def rebuild_index_from_s3_data(self, **kwargs):
"""
Rebuilds the index Store from the data in S3
Relies on the index document being stores as the metadata for the file
This can help recover lost databases
"""
bucket = self.s3_bucket
objects = bucket.objects.filter(Prefix=self.sub_dir)
for obj in objects:
key_ = self.sub_dir + obj.key
data = self.s3_bucket.Object(key_).get()["Body"].read()
if self.compress:
data = zlib.decompress(data)
unpacked_data = msgpack.unpackb(data, raw=False)
self.update(unpacked_data, **kwargs)
rebuild_metadata_from_index(self, index_query=None)
¶
Read data from the index store and populate the metadata of the S3 bucket Force all of the keys to be lower case to be Minio compatible
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index_query |
dict |
query on the index store |
None |
Source code in maggma/stores/aws.py
def rebuild_metadata_from_index(self, index_query: dict = None):
"""
Read data from the index store and populate the metadata of the S3 bucket
Force all of the keys to be lower case to be Minio compatible
Args:
index_query: query on the index store
"""
qq = {} if index_query is None else index_query
for index_doc in self.index.query(qq):
key_ = self.sub_dir + index_doc[self.key]
s3_object = self.s3_bucket.Object(key_)
new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
for k, v in index_doc.items():
new_meta[str(k).lower()] = v
new_meta.pop("_id")
if self.last_updated_field in new_meta:
new_meta[self.last_updated_field] = str(
to_isoformat_ceil_ms(new_meta[self.last_updated_field])
)
# s3_object.metadata.update(new_meta)
s3_object.copy_from(
CopySource={"Bucket": self.s3_bucket.name, "Key": key_},
Metadata=new_meta,
MetadataDirective="REPLACE",
)
remove_docs(self, criteria, remove_s3_object=False)
¶
Remove docs matching the query dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
remove_s3_object |
bool |
whether to remove the actual S3 Object or not |
False |
Source code in maggma/stores/aws.py
def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
remove_s3_object: whether to remove the actual S3 Object or not
"""
if not remove_s3_object:
self.index.remove_docs(criteria=criteria)
else:
to_remove = self.index.distinct(self.key, criteria=criteria)
self.index.remove_docs(criteria=criteria)
# Can remove up to 1000 items at a time via boto
to_remove_chunks = list(grouper(to_remove, n=1000))
for chunk_to_remove in to_remove_chunks:
objlist = [{"Key": f"{self.sub_dir}{obj}"} for obj in chunk_to_remove]
self.s3_bucket.delete_objects(Delete={"Objects": objlist})
update(self, docs, key=None, additional_metadata=None)
¶
Update documents into the Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
additional_metadata |
Union[str, List[str]] |
field(s) to include in the s3 store's metadata |
None |
Source code in maggma/stores/aws.py
def update(
self,
docs: Union[List[Dict], Dict],
key: Union[List, str, None] = None,
additional_metadata: Union[str, List[str], None] = None,
):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
additional_metadata: field(s) to include in the s3 store's metadata
"""
if not isinstance(docs, list):
docs = [docs]
if isinstance(key, str):
key = [key]
elif not key:
key = [self.key]
if additional_metadata is None:
additional_metadata = []
elif isinstance(additional_metadata, str):
additional_metadata = [additional_metadata]
else:
additional_metadata = list(additional_metadata)
with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
fs = {
pool.submit(
self.write_doc_to_s3,
doc=itr_doc,
search_keys=key + additional_metadata + self.searchable_fields,
)
for itr_doc in docs
}
fs, _ = wait(fs)
search_docs = [sdoc.result() for sdoc in fs]
# Use store's update to remove key clashes
self.index.update(search_docs, key=self.key)
write_doc_to_s3(self, doc, search_keys)
¶
Write the data to s3 and return the metadata to be inserted into the index db
Parameters:
Name | Type | Description | Default |
---|---|---|---|
doc |
Dict |
the document |
required |
search_keys |
List[str] |
list of keys to pull from the docs and be inserted into the |
required |
Source code in maggma/stores/aws.py
def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
"""
Write the data to s3 and return the metadata to be inserted into the index db
Args:
doc: the document
search_keys: list of keys to pull from the docs and be inserted into the
index db
"""
s3_bucket = self._get_bucket()
search_doc = {k: doc[k] for k in search_keys}
search_doc[self.key] = doc[self.key] # Ensure key is in metadata
if self.sub_dir != "":
search_doc["sub_dir"] = self.sub_dir
# Remove MongoDB _id from search
if "_id" in search_doc:
del search_doc["_id"]
# to make hashing more meaningful, make sure last updated field is removed
lu_info = doc.pop(self.last_updated_field, None)
data = msgpack.packb(doc, default=monty_default)
if self.compress:
# Compress with zlib if chosen
search_doc["compression"] = "zlib"
data = zlib.compress(data)
if self.last_updated_field in doc:
# need this conversion for aws metadata insert
search_doc[self.last_updated_field] = str(
to_isoformat_ceil_ms(doc[self.last_updated_field])
)
# keep a record of original keys, in case these are important for the individual researcher
# it is not expected that this information will be used except in disaster recovery
s3_to_mongo_keys = {k: self._sanitize_key(k) for k in search_doc.keys()}
s3_to_mongo_keys["s3-to-mongo-keys"] = "s3-to-mongo-keys" # inception
# encode dictionary since values have to be strings
search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
s3_bucket.put_object(
Key=self.sub_dir + str(doc[self.key]),
Body=data,
Metadata={s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()},
)
if lu_info is not None:
search_doc[self.last_updated_field] = lu_info
if self.store_hash:
hasher = sha1()
hasher.update(data)
obj_hash = hasher.hexdigest()
search_doc["obj_hash"] = obj_hash
return search_doc
Advanced Stores for behavior outside normal access patterns
AliasingStore (Store)
¶
Special Store that aliases for the primary accessors
Source code in maggma/stores/advanced_stores.py
class AliasingStore(Store):
"""
Special Store that aliases for the primary accessors
"""
def __init__(self, store: Store, aliases: Dict, **kwargs):
"""
Args:
store: the store to wrap around
aliases: dict of aliases of the form external key: internal key
"""
self.store = store
# Given an external key tells what the internal key is
self.aliases = aliases
# Given the internal key tells us what the external key is
self.reverse_aliases = {v: k for k, v in aliases.items()}
self.kwargs = kwargs
kwargs.update(
{
"last_updated_field": store.last_updated_field,
"last_updated_type": store.last_updated_type,
}
)
super(AliasingStore, self).__init__(**kwargs)
@property
def name(self) -> str:
"""
Return a string representing this data source
"""
return self.store.name
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.reverse_aliases)
return self.store.count(criteria)
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
criteria = criteria if criteria else {}
if properties is not None:
if isinstance(properties, list):
properties = {p: 1 for p in properties}
substitute(properties, self.reverse_aliases)
lazy_substitute(criteria, self.reverse_aliases)
for d in self.store.query(
properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
):
substitute(d, self.aliases)
yield d
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.reverse_aliases)
# substitute forward
return self.store.distinct(self.aliases[field], criteria=criteria)
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
# Convert to a list
keys = keys if isinstance(keys, list) else [keys]
# Make the aliasing transformations on keys
keys = [self.aliases[k] if k in self.aliases else k for k in keys]
# Update criteria and properties based on aliases
criteria = criteria if criteria else {}
if properties is not None:
if isinstance(properties, list):
properties = {p: 1 for p in properties}
substitute(properties, self.reverse_aliases)
lazy_substitute(criteria, self.reverse_aliases)
return self.store.groupby(
keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
)
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
key = key if key else self.key
for d in docs:
substitute(d, self.reverse_aliases)
if key in self.aliases:
key = self.aliases[key]
self.store.update(docs, key=key)
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
# Update criteria and properties based on aliases
lazy_substitute(criteria, self.reverse_aliases)
self.store.remove_docs(criteria)
def ensure_index(self, key, unique=False, **kwargs):
if key in self.aliases:
key = self.aliases
return self.store.ensure_index(key, unique, **kwargs)
def close(self):
self.store.close()
@property
def _collection(self):
return self.store._collection
def connect(self, force_reset=False):
self.store.connect(force_reset=force_reset)
def __eq__(self, other: object) -> bool:
"""
Check equality for AliasingStore
Args:
other: other AliasingStore to compare with
"""
if not isinstance(other, AliasingStore):
return False
fields = ["store", "aliases", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
name: str
property
readonly
¶
Return a string representing this data source
__eq__(self, other)
special
¶
Check equality for AliasingStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
other AliasingStore to compare with |
required |
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
"""
Check equality for AliasingStore
Args:
other: other AliasingStore to compare with
"""
if not isinstance(other, AliasingStore):
return False
fields = ["store", "aliases", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, store, aliases, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store |
Store |
the store to wrap around |
required |
aliases |
Dict |
dict of aliases of the form external key: internal key |
required |
Source code in maggma/stores/advanced_stores.py
def __init__(self, store: Store, aliases: Dict, **kwargs):
"""
Args:
store: the store to wrap around
aliases: dict of aliases of the form external key: internal key
"""
self.store = store
# Given an external key tells what the internal key is
self.aliases = aliases
# Given the internal key tells us what the external key is
self.reverse_aliases = {v: k for k, v in aliases.items()}
self.kwargs = kwargs
kwargs.update(
{
"last_updated_field": store.last_updated_field,
"last_updated_type": store.last_updated_type,
}
)
super(AliasingStore, self).__init__(**kwargs)
close(self)
¶
Closes any connections
Source code in maggma/stores/advanced_stores.py
def close(self):
self.store.close()
connect(self, force_reset=False)
¶
Connect to the source data
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force_reset |
whether to reset the connection or not |
False |
Source code in maggma/stores/advanced_stores.py
def connect(self, force_reset=False):
self.store.connect(force_reset=force_reset)
count(self, criteria=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
Source code in maggma/stores/advanced_stores.py
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.reverse_aliases)
return self.store.count(criteria)
distinct(self, field, criteria=None, all_exist=False)
¶
Get all distinct values for a field
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field |
str |
the field(s) to get distinct values for |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
Source code in maggma/stores/advanced_stores.py
def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: PyMongo filter for documents to search in
"""
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.reverse_aliases)
# substitute forward
return self.store.distinct(self.aliases[field], criteria=criteria)
ensure_index(self, key, unique=False, **kwargs)
¶
Tries to create an index and return true if it suceeded
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
single key to index |
required | |
unique |
Whether or not this index contains only unique keys |
False |
Returns:
Type | Description |
---|---|
bool indicating if the index exists/was created |
Source code in maggma/stores/advanced_stores.py
def ensure_index(self, key, unique=False, **kwargs):
if key in self.aliases:
key = self.aliases
return self.store.ensure_index(key, unique, **kwargs)
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (dict, list of docs) |
Source code in maggma/stores/advanced_stores.py
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
# Convert to a list
keys = keys if isinstance(keys, list) else [keys]
# Make the aliasing transformations on keys
keys = [self.aliases[k] if k in self.aliases else k for k in keys]
# Update criteria and properties based on aliases
criteria = criteria if criteria else {}
if properties is not None:
if isinstance(properties, list):
properties = {p: 1 for p in properties}
substitute(properties, self.reverse_aliases)
lazy_substitute(criteria, self.reverse_aliases)
return self.store.groupby(
keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
)
query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Queries the Store for a set of documents
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Source code in maggma/stores/advanced_stores.py
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
criteria = criteria if criteria else {}
if properties is not None:
if isinstance(properties, list):
properties = {p: 1 for p in properties}
substitute(properties, self.reverse_aliases)
lazy_substitute(criteria, self.reverse_aliases)
for d in self.store.query(
properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
):
substitute(d, self.aliases)
yield d
remove_docs(self, criteria)
¶
Remove docs matching the query dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
Source code in maggma/stores/advanced_stores.py
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
# Update criteria and properties based on aliases
lazy_substitute(criteria, self.reverse_aliases)
self.store.remove_docs(criteria)
update(self, docs, key=None)
¶
Update documents into the Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
Source code in maggma/stores/advanced_stores.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
key = key if key else self.key
for d in docs:
substitute(d, self.reverse_aliases)
if key in self.aliases:
key = self.aliases[key]
self.store.update(docs, key=key)
MongograntStore (MongoStore)
¶
Initialize a Store with a mongogrant "<role>
:<host>
/<db>
." spec.
Some class methods of MongoStore, e.g. from_db_file and from_collection, are not supported.
mongogrant documentation: https://github.com/materialsproject/mongogrant
Source code in maggma/stores/advanced_stores.py
class MongograntStore(MongoStore):
"""Initialize a Store with a mongogrant "`<role>`:`<host>`/`<db>`." spec.
Some class methods of MongoStore, e.g. from_db_file and from_collection,
are not supported.
mongogrant documentation: https://github.com/materialsproject/mongogrant
"""
def __init__(
self,
mongogrant_spec: str,
collection_name: str,
mgclient_config_path: Optional[str] = None,
**kwargs,
):
"""
Args:
mongogrant_spec: of the form `<role>`:`<host>`/`<db>`, where
role is one of {"read", "readWrite"} or aliases {"ro", "rw"};
host is a db host (w/ optional port) or alias; and db is a db
on that host, or alias. See mongogrant documentation.
collection_name: name of mongo collection
mgclient_config_path: Path to mongogrant client config file,
or None if default path (`mongogrant.client.path`).
"""
self.mongogrant_spec = mongogrant_spec
self.collection_name = collection_name
self.mgclient_config_path = mgclient_config_path
self._coll = None
if self.mgclient_config_path:
config = Config(check=check, path=self.mgclient_config_path)
client = Client(config)
else:
client = Client()
if set(("username", "password", "database", "host")) & set(kwargs):
raise StoreError(
"MongograntStore does not accept "
"username, password, database, or host "
"arguments. Use `mongogrant_spec`."
)
self.kwargs = kwargs
_auth_info = client.get_db_auth_from_spec(self.mongogrant_spec)
super(MongograntStore, self).__init__(
host=_auth_info["host"],
database=_auth_info["authSource"],
username=_auth_info["username"],
password=_auth_info["password"],
collection_name=self.collection_name,
**kwargs,
)
@property
def name(self):
return f"mgrant://{self.mongogrant_spec}/{self.collection_name}"
def __hash__(self):
return hash(
(self.mongogrant_spec, self.collection_name, self.last_updated_field)
)
@classmethod
def from_db_file(cls, file):
"""
Raises ValueError since MongograntStores can't be initialized from a file
"""
raise ValueError("MongograntStore doesn't implement from_db_file")
@classmethod
def from_collection(cls, collection):
"""
Raises ValueError since MongograntStores can't be initialized from a PyMongo collection
"""
raise ValueError("MongograntStore doesn't implement from_collection")
def __eq__(self, other: object) -> bool:
"""
Check equality for MongograntStore
Args:
other: other MongograntStore to compare with
"""
if not isinstance(other, MongograntStore):
return False
fields = [
"mongogrant_spec",
"collection_name",
"mgclient_config_path",
"last_updated_field",
]
return all(getattr(self, f) == getattr(other, f) for f in fields)
name
property
readonly
¶
Return a string representing this data source
__eq__(self, other)
special
¶
Check equality for MongograntStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
other MongograntStore to compare with |
required |
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
"""
Check equality for MongograntStore
Args:
other: other MongograntStore to compare with
"""
if not isinstance(other, MongograntStore):
return False
fields = [
"mongogrant_spec",
"collection_name",
"mgclient_config_path",
"last_updated_field",
]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, mongogrant_spec, collection_name, mgclient_config_path=None, **kwargs)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mongogrant_spec |
str |
of the form |
required |
collection_name |
str |
name of mongo collection |
required |
mgclient_config_path |
Optional[str] |
Path to mongogrant client config file,
or None if default path ( |
None |
Source code in maggma/stores/advanced_stores.py
def __init__(
self,
mongogrant_spec: str,
collection_name: str,
mgclient_config_path: Optional[str] = None,
**kwargs,
):
"""
Args:
mongogrant_spec: of the form `<role>`:`<host>`/`<db>`, where
role is one of {"read", "readWrite"} or aliases {"ro", "rw"};
host is a db host (w/ optional port) or alias; and db is a db
on that host, or alias. See mongogrant documentation.
collection_name: name of mongo collection
mgclient_config_path: Path to mongogrant client config file,
or None if default path (`mongogrant.client.path`).
"""
self.mongogrant_spec = mongogrant_spec
self.collection_name = collection_name
self.mgclient_config_path = mgclient_config_path
self._coll = None
if self.mgclient_config_path:
config = Config(check=check, path=self.mgclient_config_path)
client = Client(config)
else:
client = Client()
if set(("username", "password", "database", "host")) & set(kwargs):
raise StoreError(
"MongograntStore does not accept "
"username, password, database, or host "
"arguments. Use `mongogrant_spec`."
)
self.kwargs = kwargs
_auth_info = client.get_db_auth_from_spec(self.mongogrant_spec)
super(MongograntStore, self).__init__(
host=_auth_info["host"],
database=_auth_info["authSource"],
username=_auth_info["username"],
password=_auth_info["password"],
collection_name=self.collection_name,
**kwargs,
)
from_collection(collection)
classmethod
¶
Raises ValueError since MongograntStores can't be initialized from a PyMongo collection
Source code in maggma/stores/advanced_stores.py
@classmethod
def from_collection(cls, collection):
"""
Raises ValueError since MongograntStores can't be initialized from a PyMongo collection
"""
raise ValueError("MongograntStore doesn't implement from_collection")
from_db_file(file)
classmethod
¶
Raises ValueError since MongograntStores can't be initialized from a file
Source code in maggma/stores/advanced_stores.py
@classmethod
def from_db_file(cls, file):
"""
Raises ValueError since MongograntStores can't be initialized from a file
"""
raise ValueError("MongograntStore doesn't implement from_db_file")
SandboxStore (Store)
¶
Provides a sandboxed view to another store
Source code in maggma/stores/advanced_stores.py
class SandboxStore(Store):
"""
Provides a sandboxed view to another store
"""
def __init__(self, store: Store, sandbox: str, exclusive: bool = False):
"""
Args:
store: store to wrap sandboxing around
sandbox: the corresponding sandbox
exclusive: whether to be exclusively in this sandbox or include global items
"""
self.store = store
self.sandbox = sandbox
self.exclusive = exclusive
super().__init__(
key=self.store.key,
last_updated_field=self.store.last_updated_field,
last_updated_type=self.store.last_updated_type,
validator=self.store.validator,
)
@property
def name(self) -> str:
"""
Returns:
a string representing this data source
"""
return f"Sandbox[{self.store.name}][{self.sandbox}]"
@property
def sbx_criteria(self) -> Dict:
"""
Returns:
the sandbox criteria dict used to filter the source store
"""
if self.exclusive:
return {"sbxn": self.sandbox}
else:
return {
"$or": [{"sbxn": {"$in": [self.sandbox]}}, {"sbxn": {"$exists": False}}]
}
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
return self.store.count(criteria=criteria)
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
return self.store.query(
properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
)
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
return self.store.groupby(
keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
)
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
for d in docs:
if "sbxn" in d:
d["sbxn"] = list(set(d["sbxn"] + [self.sandbox]))
else:
d["sbxn"] = [self.sandbox]
self.store.update(docs, key=key)
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
# Update criteria and properties based on aliases
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
self.store.remove_docs(criteria)
def ensure_index(self, key, unique=False, **kwargs):
return self.store.ensure_index(key, unique, **kwargs)
def close(self):
self.store.close()
@property
def _collection(self):
return self.store._collection
def connect(self, force_reset=False):
self.store.connect(force_reset=force_reset)
def __eq__(self, other: object) -> bool:
"""
Check equality for SandboxStore
Args:
other: other SandboxStore to compare with
"""
if not isinstance(other, SandboxStore):
return False
fields = ["store", "sandbox", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
name: str
property
readonly
¶
Returns:
Type | Description |
---|---|
str |
a string representing this data source |
sbx_criteria: Dict
property
readonly
¶
Returns:
Type | Description |
---|---|
Dict |
the sandbox criteria dict used to filter the source store |
__eq__(self, other)
special
¶
Check equality for SandboxStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
other SandboxStore to compare with |
required |
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
"""
Check equality for SandboxStore
Args:
other: other SandboxStore to compare with
"""
if not isinstance(other, SandboxStore):
return False
fields = ["store", "sandbox", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, store, sandbox, exclusive=False)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store |
Store |
store to wrap sandboxing around |
required |
sandbox |
str |
the corresponding sandbox |
required |
exclusive |
bool |
whether to be exclusively in this sandbox or include global items |
False |
Source code in maggma/stores/advanced_stores.py
def __init__(self, store: Store, sandbox: str, exclusive: bool = False):
"""
Args:
store: store to wrap sandboxing around
sandbox: the corresponding sandbox
exclusive: whether to be exclusively in this sandbox or include global items
"""
self.store = store
self.sandbox = sandbox
self.exclusive = exclusive
super().__init__(
key=self.store.key,
last_updated_field=self.store.last_updated_field,
last_updated_type=self.store.last_updated_type,
validator=self.store.validator,
)
close(self)
¶
Closes any connections
Source code in maggma/stores/advanced_stores.py
def close(self):
self.store.close()
connect(self, force_reset=False)
¶
Connect to the source data
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force_reset |
whether to reset the connection or not |
False |
Source code in maggma/stores/advanced_stores.py
def connect(self, force_reset=False):
self.store.connect(force_reset=force_reset)
count(self, criteria=None)
¶
Counts the number of documents matching the query criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to count in |
None |
Source code in maggma/stores/advanced_stores.py
def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Args:
criteria: PyMongo filter for documents to count in
"""
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
return self.store.count(criteria=criteria)
ensure_index(self, key, unique=False, **kwargs)
¶
Tries to create an index and return true if it suceeded
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
single key to index |
required | |
unique |
Whether or not this index contains only unique keys |
False |
Returns:
Type | Description |
---|---|
bool indicating if the index exists/was created |
Source code in maggma/stores/advanced_stores.py
def ensure_index(self, key, unique=False, **kwargs):
return self.store.ensure_index(key, unique, **kwargs)
groupby(self, keys, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Simple grouping function that will group documents by keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[List[str], str] |
fields to group documents |
required |
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Returns:
Type | Description |
---|---|
Iterator[Tuple[Dict, List[Dict]]] |
generator returning tuples of (dict, list of docs) |
Source code in maggma/stores/advanced_stores.py
def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
generator returning tuples of (dict, list of docs)
"""
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
return self.store.groupby(
keys=keys, properties=properties, criteria=criteria, skip=skip, limit=limit
)
query(self, criteria=None, properties=None, sort=None, skip=0, limit=0)
¶
Queries the Store for a set of documents
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Optional[Dict] |
PyMongo filter for documents to search in |
None |
properties |
Union[Dict, List] |
properties to return in grouped documents |
None |
sort |
Optional[Dict[str, Union[maggma.core.store.Sort, int]]] |
Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending. |
None |
skip |
int |
number documents to skip |
0 |
limit |
int |
limit on total number of documents returned |
0 |
Source code in maggma/stores/advanced_stores.py
def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: PyMongo filter for documents to search in
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
"""
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
return self.store.query(
properties=properties, criteria=criteria, sort=sort, limit=limit, skip=skip
)
remove_docs(self, criteria)
¶
Remove docs matching the query dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict |
query dictionary to match |
required |
Source code in maggma/stores/advanced_stores.py
def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
"""
# Update criteria and properties based on aliases
criteria = (
dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
)
self.store.remove_docs(criteria)
update(self, docs, key=None)
¶
Update documents into the Store
Parameters:
Name | Type | Description | Default |
---|---|---|---|
docs |
Union[List[Dict], Dict] |
the document or list of documents to update |
required |
key |
Union[List, str] |
field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store's key field is to be used |
None |
Source code in maggma/stores/advanced_stores.py
def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
"""
for d in docs:
if "sbxn" in d:
d["sbxn"] = list(set(d["sbxn"] + [self.sandbox]))
else:
d["sbxn"] = [self.sandbox]
self.store.update(docs, key=key)
VaultStore (MongoStore)
¶
Extends MongoStore to read credentials out of Vault server and uses these values to initialize MongoStore instance
Source code in maggma/stores/advanced_stores.py
class VaultStore(MongoStore):
"""
Extends MongoStore to read credentials out of Vault server
and uses these values to initialize MongoStore instance
"""
@requires(hvac is not None, "hvac is required to use VaultStore")
def __init__(self, collection_name: str, vault_secret_path: str):
"""
Args:
collection_name: name of mongo collection
vault_secret_path: path on vault server with mongo creds object
Important:
Environment variables that must be set prior to invocation
VAULT_ADDR - URL of vault server (eg. https://matgen8.lbl.gov:8200)
VAULT_TOKEN or GITHUB_TOKEN - token used to authenticate to vault
"""
self.collection_name = collection_name
self.vault_secret_path = vault_secret_path
# TODO: Switch this over to Pydantic ConfigSettings
vault_addr = os.getenv("VAULT_ADDR")
if not vault_addr:
raise RuntimeError("VAULT_ADDR not set")
client = hvac.Client(vault_addr)
# If we have a vault token use this
token = os.getenv("VAULT_TOKEN")
# Look for a github token instead
if not token:
github_token = os.getenv("GITHUB_TOKEN")
if github_token:
client.auth_github(github_token)
else:
raise RuntimeError("VAULT_TOKEN or GITHUB_TOKEN not set")
else:
client.token = token
if not client.is_authenticated():
raise RuntimeError("Bad token")
# Read the vault secret
json_db_creds = client.read(vault_secret_path)
db_creds = json.loads(json_db_creds["data"]["value"])
database = db_creds.get("db")
host = db_creds.get("host", "localhost")
port = db_creds.get("port", 27017)
username = db_creds.get("username", "")
password = db_creds.get("password", "")
super(VaultStore, self).__init__(
database, collection_name, host, port, username, password
)
def __eq__(self, other: object) -> bool:
"""
Check equality for VaultStore
Args:
other: other VaultStore to compare with
"""
if not isinstance(other, VaultStore):
return False
fields = ["vault_secret_path", "collection_name", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__eq__(self, other)
special
¶
Check equality for VaultStore
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
other VaultStore to compare with |
required |
Source code in maggma/stores/advanced_stores.py
def __eq__(self, other: object) -> bool:
"""
Check equality for VaultStore
Args:
other: other VaultStore to compare with
"""
if not isinstance(other, VaultStore):
return False
fields = ["vault_secret_path", "collection_name", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)
__init__(self, collection_name, vault_secret_path)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_name |
str |
name of mongo collection |
required |
vault_secret_path |
str |
path on vault server with mongo creds object |
required |
Important
Environment variables that must be set prior to invocation VAULT_ADDR - URL of vault server (eg. https://matgen8.lbl.gov:8200) VAULT_TOKEN or GITHUB_TOKEN - token used to authenticate to vault
Source code in maggma/stores/advanced_stores.py
@requires(hvac is not None, "hvac is required to use VaultStore")
def __init__(self, collection_name: str, vault_secret_path: str):
"""
Args:
collection_name: name of mongo collection
vault_secret_path: path on vault server with mongo creds object
Important:
Environment variables that must be set prior to invocation
VAULT_ADDR - URL of vault server (eg. https://matgen8.lbl.gov:8200)
VAULT_TOKEN or GITHUB_TOKEN - token used to authenticate to vault
"""
self.collection_name = collection_name
self.vault_secret_path = vault_secret_path
# TODO: Switch this over to Pydantic ConfigSettings
vault_addr = os.getenv("VAULT_ADDR")
if not vault_addr:
raise RuntimeError("VAULT_ADDR not set")
client = hvac.Client(vault_addr)
# If we have a vault token use this
token = os.getenv("VAULT_TOKEN")
# Look for a github token instead
if not token:
github_token = os.getenv("GITHUB_TOKEN")
if github_token:
client.auth_github(github_token)
else:
raise RuntimeError("VAULT_TOKEN or GITHUB_TOKEN not set")
else:
client.token = token
if not client.is_authenticated():
raise RuntimeError("Bad token")
# Read the vault secret
json_db_creds = client.read(vault_secret_path)
db_creds = json.loads(json_db_creds["data"]["value"])
database = db_creds.get("db")
host = db_creds.get("host", "localhost")
port = db_creds.get("port", 27017)
username = db_creds.get("username", "")
password = db_creds.get("password", "")
super(VaultStore, self).__init__(
database, collection_name, host, port, username, password
)
Special stores that combine underlying Stores together
ConcatStore (Store)
¶
Store concatting multiple stores
Source code in maggma/stores/compound_stores.py
class ConcatStore(Store):
"""Store concatting multiple stores"""
def __init__(self, stores: List[Store], **kwargs):
"""
Initialize a ConcatStore that concatenates multiple stores together
to appear as one store
Args:
stores: list of stores to concatenate together
"""
self.stores = stores
self.kwargs = kwargs
super(ConcatStore, self).__init__(**kwargs)
@property
def name(self) -> str:
"""
A string representing this data source
"&qu