# default_exp helpers

HelpersΒΆ

Module to add utils for other modules.

augment(np.array([10, 10, 10, 10]))
[8, 13, 7, 10]
#export

from random import choice
from ipyannotator.datasets.factory import DS as NDS
from ipyannotator.datasets.factory_legacy import DS, _combine_train_test
from pathlib import Path
from tqdm.notebook import tqdm


class Tutorial:
    """
    Combines some algorithms to imitate human work with annotators

    """

    def __init__(self, dataset: Union[DS, NDS], project_path):
        self.dataset = dataset
        self.project_path = project_path
        if self.dataset not in [DS.ARTIFICIAL_CLASSIFICATION, DS.ARTIFICIAL_DETECTION,
                                NDS.ARTIFICIAL_VIDEO]:
            _combine_train_test(project_path)
        self.all_annotations = Path(project_path) / "annotations.json"

    # Random annotator used in image_classification tutorial [create]
    def annotate_randomly(self, annotator):
        with self.all_annotations.open() as f:
            anno_ = json.load(f)

        filt = np.random.uniform(low=0, high=1, size=len(anno_))

        label_noise = 0.1

        def get_random_class():
            return choice(annotator.storage.get_labels())

        # assign random label for subset of all annotations to imitate human work with
        # <label_noise> amount of errors
        self.filterered = {
            x: [get_random_class()] if f_ < label_noise else y for (x, y),
            f_ in zip(anno_.items(), filt)}

        # update ipyannotator's annotations bassed on previous step and save
        annotator.storage.update((k, self.filterered.get(k, [])) for k in annotator.storage.keys())
        annotator.storage.save()

    # Annotations fixer used in image_classification tutorial [improve]
    def fix_incorrect_annotations(self, annotator: list):
        with self.all_annotations.open() as f:
            anno_ = json.load(f)

        #  mark spoiled on create step, imitating human correction
        for i in tqdm(annotator):
            i.storage.annotations.update(
                (
                    k, {'answer': anno_[k] != self.filterered[k]}
                ) for k in i.storage.annotations.keys()
            )
            i.storage.annotations.save()

    # Random annotator used in bbox tutorial [create]
    def add_random_bboxes(self, annotator):
        with self.all_annotations.open() as f:
            anno_ = json.load(f)

        filt = np.random.uniform(low=0, high=1, size=len(anno_))

        bbox_noise = 0.2

        # lets randomly annotate each image from code and save annotations
        for k, f_ in tqdm(list(zip(annotator.storage.keys(), filt))):
            # do not overwrite existing annotations
            if annotator.storage[k]:
                continue

            if f_ < bbox_noise:
                values = []
                assert isinstance(anno_[k]['bbox'], list)
                for bbox in anno_[k]['bbox']:
                    assert isinstance(bbox, dict)
                    values.append(
                        dict(
                            zip(
                                ['x', 'y', 'width', 'height'],
                                augment(np.fromiter(bbox.values(), dtype=np.uint64))
                            )
                        )
                    )
                annotator.storage[k] = {'bbox': values, 'labels': [[]]}
            else:
                annotator.storage[k] = anno_[k]

        annotator.controller._update_coords(annotator.app_state.index)  # update screen

        annotator.view._save_btn.click()  # save to file

    # Annotations fixer used in bbox tutorial [improve]
    def fix_incorrect_bboxes(self, improver, creator):
        with self.all_annotations.open() as f:
            anno_ = json.load(f)

        im_dir_path = creator.storage.im_dir
        improver.app_state.index = 0

        for i in tqdm(range(improver.app_state.max_im_number)):
            for k, v in improver.capture_state.annotations.items():
                capture_im_path = Path(k)

                index = capture_im_path.parts.index('captured') + 1
                new_im_path = im_dir_path.joinpath(*capture_im_path.parts[index:])
                v_expl = anno_.get(str(new_im_path), {})
                v_cret = creator.storage.get(str(new_im_path), {})
                improver.capture_state.annotations[k] = {'answer': v_expl != v_cret}
            improver.view._navi._next_btn.click()

    def annotate_video_bboxes(self, annotator) -> dict:
        mot_gt = pd.read_csv(self.project_path / 'mot.csv')
        mot_gt.columns = [
            'frame',
            'id',
            'conf',
            'label',
            'vis',
            'x',
            'y',
            'width',
            'height',
        ]
        mot_gt.sort_values(by=['frame'])
        mot_gt['frame'] = mot_gt['frame'].astype(str).str.zfill(4)
        full_path = f'{self.project_path}/images'
        mot_gt['frame'] = mot_gt['frame'].apply(lambda x: full_path + '/' + x + '.jpg')
        mot_gt.index = mot_gt['frame']
        mot_gt = mot_gt.drop(columns=['frame', 'conf', 'label', 'vis'])
#         mot_gt = mot_gt[mot_gt.columns.drop(['frame', 'conf', 'label', 'vis'])]
        mot_gt = mot_gt.groupby('frame').apply(lambda x: x.to_json(orient='records'))
        result = mot_gt.to_json(orient='index')
        parsed = json.loads(result)

        # Hacky way to occlude the video tutorial avoiding
        # to render indexes on specific frames
        i = 0
        annotations = {}
        for k, v in parsed.items():
            bboxes = json.loads(v)
            for bbox in bboxes:
                tmp_bbox = bbox.copy()
                del bbox['id']
                bbox['id'] = self._mutate_id(tmp_bbox, i)

            if i < 4 or i > 8:
                annotations[k] = {
                    'bbox': bboxes,
                    'labels': [self._bbox_to_label(bbox) for bbox in bboxes]
                }
            else:
                # add circle annotation to specific frames
                bboxes = [bbox for bbox in bboxes if bbox['height'] == bbox['width']]
                annotations[k] = {
                    'bbox': bboxes,
                    'labels': [['Circle'] for bbox in bboxes]
                }
            i += 1

        with open(self.project_path / 'create_results/annotations.json', 'w+') as f:
            json.dump(annotations, f)

        return annotations

    def _mutate_id(self, bbox: dict, index: int) -> str:
        id = '2'
        if bbox['height'] == bbox['width']:
            id = '1'
        return id if index > 8 else str(bbox['id'])

    def _bbox_to_label(self, bbox: dict):
        if bbox['height'] == bbox['width']:
            return ['Circle']
        return ['Rectangle']