import difflib
import os
import random
import re
from typing import Dict, Optional
import numpy as np
import data_juicer
from data_juicer.ops.base_op import OPERATORS, TAGGING_OPS, UNFORKABLE, Mapper
from data_juicer.ops.load import load_ops
from data_juicer.ops.op_fusion import LOADED_IMAGES
from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import SpecialTokens
cv2 = LazyLoader('cv2', 'cv2')
nltk = LazyLoader('nltk', 'nltk')
def is_noun(word):
# print(word)
pos_tagged = nltk.pos_tag([word])
pos = pos_tagged[0][1]
if pos not in ['NN', 'NNS', 'NNP', 'NNPS']:
return False
return True
def compare_text_index(text1, text2):
text1_split = []
text2_split = []
lemmatizer = nltk.stem.WordNetLemmatizer()
d = difflib.Differ()
diff = d.compare(
re.sub(r'[^\w\s]', '',
text1.lower().replace(' ', '\n')).splitlines(),
re.sub(r'[^\w\s]', '',
text2.lower().replace(' ', '\n')).splitlines())
for line in diff:
if line.startswith('+'):
text2_split.append(lemmatizer.lemmatize(line.replace('+ ', '')))
elif line.startswith('-'):
text1_split.append(lemmatizer.lemmatize(line.replace('- ', '')))
text1 = []
text2 = []
for temp_idx, temp_word1 in enumerate(text1_split):
if temp_word1 not in text2_split:
if is_noun(temp_word1):
text1.append(temp_word1)
for temp_idx, temp_word2 in enumerate(text2_split):
if temp_word2 not in text1_split:
if is_noun(temp_word2):
text2.append(temp_word2)
return text1, text2
def iou_filter(samples, iou_thresh):
x1 = samples[:, 0] - samples[:, 2] / 2
y1 = samples[:, 1] - samples[:, 3] / 2
x2 = samples[:, 0] + samples[:, 2] / 2
y2 = samples[:, 1] + samples[:, 3] / 2
areas = (y2 - y1 + 1) * (x2 - x1 + 1)
keep_boxes = []
index = np.arange(len(samples))
while len(index) > 0:
i = index[0]
keep_boxes.append(i)
x1_overlap = np.maximum(x1[i], x1[index[1:]])
y1_overlap = np.maximum(y1[i], y1[index[1:]])
x2_overlap = np.minimum(x2[i], x2[index[1:]])
y2_overlap = np.minimum(
y2[i], y2[index[1:]]) # len(y2_overlap) == len(index) - 1
w = np.maximum(0, x2_overlap - x1_overlap + 1)
h = np.maximum(0, y2_overlap - y1_overlap + 1)
overlap_area = w * h
ious = overlap_area / (areas[i] + areas[index[1:]] - overlap_area)
idx = np.where(ious <= iou_thresh)[0]
index = index[idx + 1] # update
return samples[keep_boxes]
OP_NAME = 'imgdiff_difference_area_generator_mapper'
[docs]
@UNFORKABLE.register_module(OP_NAME)
@TAGGING_OPS.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
@LOADED_IMAGES.register_module(OP_NAME)
class Difference_Area_Generator_Mapper(Mapper):
"""A fused operator for OPs that is used to run sequential OPs on
the same batch to allow fine-grained control on data processing."""
_accelerator = 'cuda'
[docs]
def __init__(self,
image_pair_similarity_filter_args: Optional[Dict] = {},
image_segment_mapper_args: Optional[Dict] = {},
image_text_matching_filter_args: Optional[Dict] = {},
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.FIXED_ARGS = {}
self.FIXED_ARGS['image_pair_similarity_filter'] = {
'min_score_1': 0.1,
'max_score_1': 1.0,
'min_score_2': 0.1,
'max_score_2': 1.0,
'hf_clip': 'openai/clip-vit-base-patch32',
'num_proc': 1
}
self.FIXED_ARGS['image_segment_mapper'] = {
'imgsz': 1024,
'conf': 0.05,
'iou': 0.5,
'model_path': 'FastSAM-x.pt',
}
self.FIXED_ARGS['image_text_matching_filter'] = {
'min_score': 0.1,
'max_score': 1.0,
'hf_blip': 'Salesforce/blip-itm-base-coco',
'num_proc': 1
}
self.image_pair_similarity_filter_args = self._prepare_op_args(
'image_pair_similarity_filter', image_pair_similarity_filter_args)
self.image_segment_mapper_args = self._prepare_op_args(
'image_segment_mapper', image_segment_mapper_args)
self.image_text_matching_filter_args = self._prepare_op_args(
'image_text_matching_filter', image_text_matching_filter_args)
self.fused_op_list = [{
'image_pair_similarity_filter':
self.image_pair_similarity_filter_args
}, {
'image_segment_mapper':
self.image_segment_mapper_args
}, {
'image_text_matching_filter':
self.image_text_matching_filter_args
}]
self.fused_ops = load_ops(self.fused_op_list)
self._name = 'Difference_Area_Generator_Mapper:(%s)' % ','.join(
[op._name for op in self.fused_ops])
# set accelerator to 'cuda' if there exists any ops whose accelerator
# is 'cuda'
accelerator_methods = set([op.accelerator for op in self.fused_ops])
if 'cuda' in accelerator_methods:
self.accelerator = 'cuda'
# update num_proc with the min num_proc of all fusible filters
self.num_proc = min([op.runtime_np() for op in self.fused_ops]) \
if self.fused_ops else 1
def _prepare_op_args(self, op_name, args_dict):
for key in self.FIXED_ARGS[op_name]:
if key not in args_dict:
args_dict[key] = self.FIXED_ARGS[op_name][key]
args_dict['accelerator'] = self.accelerator
return args_dict
[docs]
def process_single(self, samples, rank=None):
random_num = str(random.random()).split('.')[-1]
self.fused_ops[0].min_score = self.image_pair_similarity_filter_args[
'min_score_1']
self.fused_ops[0].max_score = self.image_pair_similarity_filter_args[
'max_score_1']
if not os.path.exists(DATA_JUICER_ASSETS_CACHE):
os.makedirs(DATA_JUICER_ASSETS_CACHE, exist_ok=True)
# fused_ops 1.image_pair_similarity_filter 2.image_segment_mapper
# 3.image_text_matching_filter keys of sample: "caption1", "caption2",
# "image_path1", "image_path2"
# Step1: filter out image pairs with large differences between the two
# images.
temp_sample = {}
temp_sample['text'] = 'temp image pairs ' + random_num
temp_sample['images'] = [
samples['image_path1'], samples['image_path2']
]
temp_sample = data_juicer.core.NestedDataset.from_list([temp_sample])
if Fields.stats not in temp_sample.features:
temp_sample = temp_sample.add_column(name=Fields.stats,
column=[{}] *
temp_sample.num_rows)
new_samples_s1 = self.fused_ops[0].compute_stats_single(temp_sample[0],
rank=rank)
new_samples_s1 = self.fused_ops[0].process_single(new_samples_s1,
rank=rank)
if not new_samples_s1:
return {
Fields.meta: {
MetaKeys.bbox_tag: np.zeros((1, 4), dtype=np.float32)
}
}
# Step2: compare the differences between the two captions and identify
# the "valid object".
valid_object1, valid_object2 = compare_text_index(
samples['caption1'], samples['caption2'])
# Step3: segment the regions in two images that may contain valid
# objects.
temp_sample = {}
temp_sample['images'] = [
samples['image_path1'], samples['image_path2']
]
temp_sample = data_juicer.core.NestedDataset.from_list([temp_sample])
if Fields.meta not in temp_sample.features:
temp_sample = temp_sample.add_column(name=Fields.meta,
column=[{}] *
temp_sample.num_rows)
new_samples_s2 = self.fused_ops[1].process_single(temp_sample[0],
rank=rank)
image1_bboxes = new_samples_s2[Fields.meta][MetaKeys.bbox_tag][0]
image2_bboxes = new_samples_s2[Fields.meta][MetaKeys.bbox_tag][1]
# Step4: crop sub-images based on the bounding boxes for subsequent
# image-text matching processes.
crop_image1_samples = []
crop_image2_samples = []
crop_image1_path_to_bbox_dict = {}
crop_image2_path_to_bbox_dict = {
} # Used to associate filenames with bounding boxes.
image_array1 = cv2.imread(samples['image_path1'])
image_array2 = cv2.imread(samples['image_path2'])
image_array2 = cv2.resize(
image_array2, (image_array1.shape[1], image_array1.shape[0]))
# bbox from 1 -> crop 1, 2
for temp_bbox_id, temp_bbox in enumerate(image1_bboxes):
crop_img = image_array1[
int(temp_bbox[1] - temp_bbox[3] / 2):int(temp_bbox[1] +
temp_bbox[3] / 2),
int(temp_bbox[0] - temp_bbox[2] / 2):int(temp_bbox[0] +
temp_bbox[2] / 2), :]
valid_img_name = samples['image_path1'].split('/')[-1].split(
'.')[-2]
temp_image_path = os.path.join(
DATA_JUICER_ASSETS_CACHE,
f'cropped_images_{valid_img_name}_{random_num}_1_'
f'<{str(temp_bbox_id)}>.jpg')
cv2.imwrite(temp_image_path, crop_img)
crop_image1_path_to_bbox_dict[temp_image_path] = temp_bbox
for temp_valid_object1 in valid_object1:
crop_image1_samples.append({
'text':
f'{SpecialTokens.image}' + temp_valid_object1 +
f'{SpecialTokens.eoc} ',
'images': [temp_image_path]
})
crop_img_another = image_array2[
int(temp_bbox[1] - temp_bbox[3] / 2):int(temp_bbox[1] +
temp_bbox[3] / 2),
int(temp_bbox[0] - temp_bbox[2] / 2):int(temp_bbox[0] +
temp_bbox[2] / 2), :]
valid_img_name = samples['image_path2'].split('/')[-1].split(
'.')[-2]
temp_image_path = os.path.join(
DATA_JUICER_ASSETS_CACHE,
f'cropped_images_{valid_img_name}_{random_num}_2_'
f'<{str(temp_bbox_id)}>.jpg')
cv2.imwrite(temp_image_path, crop_img_another)
crop_image2_path_to_bbox_dict[temp_image_path] = temp_bbox
for temp_valid_object2 in valid_object2:
crop_image2_samples.append({
'text':
f'{SpecialTokens.image}' + temp_valid_object2 +
f'{SpecialTokens.eoc} ',
'images': [temp_image_path]
})
# bbox from 2 -> crop 2, 1
for temp_bbox_id, temp_bbox in enumerate(image2_bboxes):
temp_crop_image_pair_id = len(image1_bboxes) + temp_bbox_id
crop_img = image_array2[
int(temp_bbox[1] - temp_bbox[3] / 2):int(temp_bbox[1] +
temp_bbox[3] / 2),
int(temp_bbox[0] - temp_bbox[2] / 2):int(temp_bbox[0] +
temp_bbox[2] / 2), :]
valid_img_name = samples['image_path2'].split('/')[-1].split(
'.')[-2]
temp_image_path = os.path.join(
DATA_JUICER_ASSETS_CACHE,
f'cropped_images_{valid_img_name}_{random_num}_2_'
f'<{str(temp_crop_image_pair_id)}>.jpg')
cv2.imwrite(temp_image_path, crop_img)
crop_image2_path_to_bbox_dict[temp_image_path] = temp_bbox
for temp_valid_object2 in valid_object2:
crop_image2_samples.append({
'text':
f'{SpecialTokens.image}' + temp_valid_object2 +
f'{SpecialTokens.eoc} ',
'images': [temp_image_path]
})
crop_img_another = image_array1[
int(temp_bbox[1] - temp_bbox[3] / 2):int(temp_bbox[1] +
temp_bbox[3] / 2),
int(temp_bbox[0] - temp_bbox[2] / 2):int(temp_bbox[0] +
temp_bbox[2] / 2), :]
valid_img_name = samples['image_path1'].split('/')[-1].split(
'.')[-2]
temp_image_path = os.path.join(
DATA_JUICER_ASSETS_CACHE,
f'cropped_images_{valid_img_name}_{random_num}_1_'
f'<{str(temp_crop_image_pair_id)}>.jpg')
cv2.imwrite(temp_image_path, crop_img_another)
crop_image1_path_to_bbox_dict[temp_image_path] = temp_bbox
for temp_valid_object1 in valid_object1:
crop_image1_samples.append({
'text':
f'{SpecialTokens.image}' + temp_valid_object1 +
f'{SpecialTokens.eoc} ',
'images': [temp_image_path]
})
# Step5: determine whether the sub-images contain valid objects.
crop_image1_samples = data_juicer.core.NestedDataset.from_list(
crop_image1_samples)
if Fields.stats not in crop_image1_samples.features:
crop_image1_samples = crop_image1_samples.add_column(
name=Fields.stats, column=[{}] * crop_image1_samples.num_rows)
crop_image1_filtered = crop_image1_samples.map(
self.fused_ops[2].compute_stats,
num_proc=self.image_text_matching_filter_args['num_proc'],
with_rank=True)
crop_image1_filtered = crop_image1_filtered.filter(
self.fused_ops[2].process,
num_proc=self.image_text_matching_filter_args['num_proc'])
crop_image1_filtered = crop_image1_filtered.to_list()
crop_image2_samples = data_juicer.core.NestedDataset.from_list(
crop_image2_samples)
if Fields.stats not in crop_image2_samples.features:
crop_image2_samples = crop_image2_samples.add_column(
name=Fields.stats, column=[{}] * crop_image2_samples.num_rows)
crop_image2_filtered = crop_image2_samples.map(
self.fused_ops[2].compute_stats,
num_proc=self.image_text_matching_filter_args['num_proc'],
with_rank=True)
crop_image2_filtered = crop_image2_filtered.filter(
self.fused_ops[2].process,
num_proc=self.image_text_matching_filter_args['num_proc'])
crop_image2_filtered = crop_image2_filtered.to_list()
crop_image2_filtered_bbox_id = []
seen = []
for temp_crop_image2_filtered in crop_image2_filtered:
crop_image2_filtered_bbox_id.append(
temp_crop_image2_filtered['images'][0].split('_')[-1].split(
'.')[-2])
filtered_sub_image_pairs = []
for temp_crop_image1_filtered in crop_image1_filtered:
temp_bbox_id = temp_crop_image1_filtered['images'][0].split(
'_')[-1].split('.')[-2]
if temp_bbox_id in seen:
continue
if temp_bbox_id in crop_image2_filtered_bbox_id:
seen.append(temp_bbox_id)
temp_filtered_sub_image_pairs = {}
temp_filtered_sub_image_pairs['text'] = temp_bbox_id
valid_image_path1 = samples['image_path1'].split(
'/')[-1].split('.')[-2]
valid_image_path2 = samples['image_path2'].split(
'/')[-1].split('.')[-2]
temp_filtered_sub_image_pairs['images'] = [
temp_crop_image1_filtered['images'][0],
temp_crop_image1_filtered['images'][0].replace(
valid_image_path1,
valid_image_path2).replace('_1_<', '_2_<')
]
filtered_sub_image_pairs.append(temp_filtered_sub_image_pairs)
# Step6: determine whether there are differences in the two images
# corresponding to each bounding box.
filtered_sub_image_pairs = data_juicer.core.NestedDataset.from_list(
filtered_sub_image_pairs)
self.fused_ops[0].min_score = self.image_pair_similarity_filter_args[
'min_score_2']
self.fused_ops[0].max_score = self.image_pair_similarity_filter_args[
'max_score_2']
if Fields.stats not in filtered_sub_image_pairs.features:
filtered_sub_image_pairs = filtered_sub_image_pairs.add_column(
name=Fields.stats,
column=[{}] * filtered_sub_image_pairs.num_rows)
filtered_sub_image_pairs = filtered_sub_image_pairs.map(
self.fused_ops[0].compute_stats,
num_proc=self.image_pair_similarity_filter_args['num_proc'],
with_rank=True)
filtered_sub_image_pairs = filtered_sub_image_pairs.filter(
self.fused_ops[0].process,
num_proc=self.image_pair_similarity_filter_args['num_proc'])
filtered_sub_image_pairs = filtered_sub_image_pairs.to_list()
if len(filtered_sub_image_pairs) == 0:
for temp_image_path in crop_image1_path_to_bbox_dict:
os.remove(temp_image_path)
for temp_image_path in crop_image2_path_to_bbox_dict:
os.remove(temp_image_path)
return {
Fields.meta: {
MetaKeys.bbox_tag: np.zeros((1, 4), dtype=np.float32)
}
}
filtered_bboxes = []
for temp_sub_image_pairs in filtered_sub_image_pairs:
filtered_bboxes.append(crop_image1_path_to_bbox_dict[
temp_sub_image_pairs['images'][0]])
filtered_bboxes = np.array(filtered_bboxes)
# Step7: remove overlapping bounding boxes.
iou_thresh = 0.5
filtered_bboxes = iou_filter(filtered_bboxes, iou_thresh)
samples[Fields.meta] = {}
samples[Fields.meta][MetaKeys.bbox_tag] = filtered_bboxes
# Step8: clear the cache
for temp_image_path in crop_image1_path_to_bbox_dict:
os.remove(temp_image_path)
for temp_image_path in crop_image2_path_to_bbox_dict:
os.remove(temp_image_path)
return samples