# default_exp storage
Storage¶
Project Structure setup¶
just to folders, first containing object (e.g images) to annotate and second folder contains annotation data/results.
(construct_annotation_path('../data/test_anno_path'),
construct_annotation_path(file_name='../results/annotations.json'),
construct_annotation_path(project_path='../test_anno_path', results_dir='outpi'))
(Path('../data/test_anno_path/results/annotations.json'),
Path('../results/annotations.json'),
Path('../test_anno_path/outpi/annotations.json'))
Generic Storage for Annotations¶
key values store
key, object_id / file_name
value json blob containing annotation
#export
class MapeableStorage(MutableMapping):
def __init__(self):
self.mapping = {}
def __getitem__(self, key):
return self.mapping[key]
def __delitem__(self, key):
if key in self:
del self.mapping[key]
def __setitem__(self, key, value):
self.mapping[key] = value
def __iter__(self):
return iter(self.mapping)
def __len__(self):
return len(self.mapping)
def to_dict(self, only_annotated=True):
if only_annotated:
return {k: copy.deepcopy(v) for k, v in self.mapping.items() if v}
else:
return copy.deepcopy(self.mapping)
m = MapeableStorage()
m.update({'test': 1})
m['test']
1
#export
class AnnotationStorage(MapeableStorage):
"""
Represents generic storage for annotations.
`key` is object_id / file_name and `value` - json blob containing annotation.
im_paths - list of existing images as <Path> objects
"""
def __init__(self, im_paths):
super().__init__()
self.update({str(p): None for p in im_paths})
def __repr__(self):
return f"{type(self).__name__}({self.mapping})"
def save(self, file_name):
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(self.mapping, f, ensure_ascii=False, sort_keys=True, indent=4)
def load(self, file_name):
with open(file_name) as data_file:
self.mapping = json.load(data_file)
DB backed storage¶
Changes in annotation should be tracked in db.
db
sqlite memory / disk, how to sync so that race conditons are avoided?
remote db (postgres, mysql etc.) with sqlalchemy layer
write sqlite functions¶
init db
write json + timestamp to db BUT only if json has changed!
iterate over db
iterate over values with latest timestamp
get all history for key
allow for metadata?
check how sqlite write locks work
# export
import sqlite3
DROP TABLE suppliers;
CREATE TABLE suppliers (
supplier_id INTEGER PRIMARY KEY,
supplier_name TEXT NOT NULL,
group_id INTEGER NOT NULL,
FOREIGN KEY (group_id)
REFERENCES supplier_groups (group_id)
);
SQL helper functions¶
is needed for consistant iteration order
#export
def _get_order_id(conn, object_id, table_name='objects'):
query = """
SELECT orderID from {}
WHERE objectID = '{}'
""".format(table_name, object_id)
c = conn.cursor()
res = c.execute(query).fetchone()
if res is not None:
return res[0]
#export
def _create_order_id(conn, object_id, table_name='objects'):
order_id = _get_order_id(conn, object_id, table_name=table_name)
if order_id:
return order_id
query = """
INSERT INTO {}('objectID') VALUES('{}')
""".format(table_name, object_id)
c = conn.cursor()
c.execute(query)
return _get_order_id(conn, object_id, table_name=table_name)
#export
def _get(conn, object_id, table_name='data'):
query = """
SELECT data FROM {}
WHERE objectID = '{}'
GROUP BY objectID
ORDER BY timestamp
""".format(table_name, object_id)
c = conn.cursor()
res = c.execute(query).fetchone()
if res is not None:
return json.loads(res[0])
#export
def _get_object_id_at_pos(conn, pos, table_name='objects'):
query = """
SELECT objectID FROM {}
ORDER BY orderID
LIMIT {}, 1
""".format(table_name, pos)
c = conn.cursor()
res = c.execute(query).fetchone()
if res is not None:
return res[0]
#export
def _insert(conn, object_id, data: dict, table_name='data', author='author'):
# insert if values have been changed
last = _get(conn, object_id)
# if last is None:
_create_order_id(conn, object_id)
if data == last:
return
c = conn.cursor()
c.execute("insert into {}('objectID', 'author', 'data') values (?, ?, ?)".format(table_name),
[object_id, author, json.dumps(data)])
conn.commit()
#export
def _to_dict(conn, table_name='data'):
query = """
SELECT objectID, data from {}
GROUP BY objectID
ORDER BY timestamp
""".format(table_name)
c = conn.cursor()
return {key: json.loads(value) for key, value in c.execute(query).fetchall()}
#export
def _row_count(conn, table_name='data'):
query = """
SELECT COUNT(DISTINCT objectID) FROM {}
""".format(table_name)
c = conn.cursor()
res = c.execute(query).fetchone()
return res[0]
#export
def _delete_last(conn, object_id, table_name='data'):
query = """
DELETE FROM {}
WHERE objectId = '{}'
ORDER BY timestamp
LIMIT 1
""".format(table_name, object_id)
c = conn.cursor()
c.execute(query)
conn.commit()
#export
def _delete_all(conn, object_id, table_name='data'):
query = """
DELETE FROM {}
WHERE objectId = '{}'
""".format(table_name, object_id)
c = conn.cursor()
c.execute(query)
conn.commit()
Persistent Storage with history support¶
#export
class AnnotationStorageIterator:
def __init__(self, annotator_storage):
self.annotator_storage = annotator_storage
self.index = 0
def __next__(self):
try:
result = self.annotator_storage.at(self.index)
self.index += 1
except IndexError:
raise StopIteration
return result
def next(self):
return self.__next__()
def prev(self):
self.index -= 1
if self.index < 0:
raise StopIteration
return self.annotator_storage.at(self.index)
#export
class AnnotationDBStorage(MutableMapping):
def __init__(self, conn_string, im_paths=None):
self.conn = sqlite3.connect(conn_string)
_create_tables(self.conn)
if im_paths:
self.update({p.name: {} for p in im_paths})
def update(self, dict_):
for k, v in dict_.items():
_insert(self.conn, k, v)
def __getitem__(self, key):
item = _get(self.conn, key)
if item is None:
raise IndexError
return item
def get(self, key, default):
if _get(self.conn, key) is None:
return default
def __delitem__(self, key):
_delete_last(self.conn, key)
def delete_all(self, key):
_delete_all(self.conn, key)
def at(self, pos):
# bug fix needed when combined with del operations
object_id = _get_object_id_at_pos(self.conn, pos)
if object_id is None or pos < 0:
raise IndexError
return _get(self.conn, object_id)
def __setitem__(self, key, value):
_insert(self.conn, key, value)
def __iter__(self):
return AnnotationStorageIterator(self)
def __len__(self):
return _row_count(self.conn)
def __repr__(self):
return f"{type(self).__name__}({_list_table(self.conn)[:2] + [' ...']})"
def to_dict(self):
return _to_dict(self.conn)