# default_exp storage

Storage

Project Structure setup

just to folders, first containing object (e.g images) to annotate and second folder contains annotation data/results.

(construct_annotation_path('../data/test_anno_path'),
 construct_annotation_path(file_name='../results/annotations.json'),
 construct_annotation_path(project_path='../test_anno_path', results_dir='outpi'))
(Path('../data/test_anno_path/results/annotations.json'),
 Path('../results/annotations.json'),
 Path('../test_anno_path/outpi/annotations.json'))

Generic Storage for Annotations

key values store

  • key, object_id / file_name

  • value json blob containing annotation

#export

class MapeableStorage(MutableMapping):
    def __init__(self):
        self.mapping = {}

    def __getitem__(self, key):
        return self.mapping[key]

    def __delitem__(self, key):
        if key in self:
            del self.mapping[key]

    def __setitem__(self, key, value):
        self.mapping[key] = value

    def __iter__(self):
        return iter(self.mapping)

    def __len__(self):
        return len(self.mapping)

    def to_dict(self, only_annotated=True):
        if only_annotated:
            return {k: copy.deepcopy(v) for k, v in self.mapping.items() if v}
        else:
            return copy.deepcopy(self.mapping)
m = MapeableStorage()
m.update({'test': 1})
m['test']
1
#export

class AnnotationStorage(MapeableStorage):
    """
    Represents generic storage for annotations.

    `key` is object_id / file_name and `value` - json blob containing annotation.

    im_paths - list of existing images as <Path> objects

    """

    def __init__(self, im_paths):
        super().__init__()
        self.update({str(p): None for p in im_paths})

    def __repr__(self):
        return f"{type(self).__name__}({self.mapping})"

    def save(self, file_name):
        with open(file_name, 'w', encoding='utf-8') as f:
            json.dump(self.mapping, f, ensure_ascii=False, sort_keys=True, indent=4)

    def load(self, file_name):
        with open(file_name) as data_file:
            self.mapping = json.load(data_file)

DB backed storage

  • Changes in annotation should be tracked in db.

  • db

    • sqlite memory / disk, how to sync so that race conditons are avoided?

    • remote db (postgres, mysql etc.) with sqlalchemy layer

write sqlite functions

  • init db

  • write json + timestamp to db BUT only if json has changed!

  • iterate over db

  • iterate over values with latest timestamp

  • get all history for key

  • allow for metadata?

  • check how sqlite write locks work

# export

import sqlite3
DROP TABLE suppliers;

CREATE TABLE suppliers (
    supplier_id   INTEGER PRIMARY KEY,
    supplier_name TEXT    NOT NULL,
    group_id      INTEGER NOT NULL,
    FOREIGN KEY (group_id)
       REFERENCES supplier_groups (group_id) 
);

SQL helper functions

is needed for consistant iteration order

#export
def _get_order_id(conn, object_id, table_name='objects'):
    query = """
    SELECT orderID from {}
    WHERE objectID = '{}'
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    if res is not None:
        return res[0]
#export
def _create_order_id(conn, object_id, table_name='objects'):
    order_id = _get_order_id(conn, object_id, table_name=table_name)
    if order_id:
        return order_id
    query = """
    INSERT INTO {}('objectID') VALUES('{}')
    """.format(table_name, object_id)
    c = conn.cursor()
    c.execute(query)
    return _get_order_id(conn, object_id, table_name=table_name)
#export
def _get(conn, object_id, table_name='data'):
    query = """
    SELECT data FROM {}
    WHERE objectID = '{}'

    GROUP BY objectID
    ORDER BY timestamp
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    if res is not None:
        return json.loads(res[0])
#export
def _get_object_id_at_pos(conn, pos, table_name='objects'):
    query = """
    SELECT objectID FROM {}
    ORDER BY orderID
    LIMIT {}, 1
    """.format(table_name, pos)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    if res is not None:
        return res[0]
#export
def _insert(conn, object_id, data: dict, table_name='data', author='author'):
    # insert if values have been changed

    last = _get(conn, object_id)

#     if last is None:
    _create_order_id(conn, object_id)
    if data == last:
        return
    c = conn.cursor()
    c.execute("insert into {}('objectID', 'author', 'data') values (?, ?, ?)".format(table_name),
              [object_id, author, json.dumps(data)])
    conn.commit()
#export
def _to_dict(conn, table_name='data'):
    query = """
    SELECT objectID, data from {}

    GROUP BY objectID
    ORDER BY timestamp
    """.format(table_name)
    c = conn.cursor()
    return {key: json.loads(value) for key, value in c.execute(query).fetchall()}
#export
def _row_count(conn, table_name='data'):
    query = """
    SELECT COUNT(DISTINCT objectID) FROM {}
    """.format(table_name)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    return res[0]
#export
def _delete_last(conn, object_id, table_name='data'):
    query = """
    DELETE FROM {}
    WHERE objectId = '{}'
    ORDER BY timestamp
    LIMIT 1
    """.format(table_name, object_id)
    c = conn.cursor()
    c.execute(query)
    conn.commit()
#export
def _delete_all(conn, object_id, table_name='data'):
    query = """
    DELETE FROM {}
    WHERE objectId = '{}'
    """.format(table_name, object_id)
    c = conn.cursor()
    c.execute(query)
    conn.commit()

Persistent Storage with history support

#export

class AnnotationStorageIterator:
    def __init__(self, annotator_storage):
        self.annotator_storage = annotator_storage
        self.index = 0

    def __next__(self):
        try:
            result = self.annotator_storage.at(self.index)
            self.index += 1
        except IndexError:
            raise StopIteration
        return result

    def next(self):
        return self.__next__()

    def prev(self):
        self.index -= 1
        if self.index < 0:
            raise StopIteration
        return self.annotator_storage.at(self.index)
#export

class AnnotationDBStorage(MutableMapping):
    def __init__(self, conn_string, im_paths=None):
        self.conn = sqlite3.connect(conn_string)
        _create_tables(self.conn)
        if im_paths:
            self.update({p.name: {} for p in im_paths})

    def update(self, dict_):
        for k, v in dict_.items():
            _insert(self.conn, k, v)

    def __getitem__(self, key):
        item = _get(self.conn, key)
        if item is None:
            raise IndexError
        return item

    def get(self, key, default):
        if _get(self.conn, key) is None:
            return default

    def __delitem__(self, key):
        _delete_last(self.conn, key)

    def delete_all(self, key):
        _delete_all(self.conn, key)

    def at(self, pos):
        # bug fix needed when combined with del operations
        object_id = _get_object_id_at_pos(self.conn, pos)
        if object_id is None or pos < 0:
            raise IndexError
        return _get(self.conn, object_id)

    def __setitem__(self, key, value):
        _insert(self.conn, key, value)

    def __iter__(self):
        return AnnotationStorageIterator(self)

    def __len__(self):
        return _row_count(self.conn)

    def __repr__(self):
        return f"{type(self).__name__}({_list_table(self.conn)[:2] + [' ...']})"

    def to_dict(self):
        return _to_dict(self.conn)