Source code for KGE.data_utils

import logging
import os
import scipy
import numpy as np
import pandas as pd
import tensorflow as tf

from tqdm import tqdm
from .utils import check_path_exist_and_create


[docs]class myIter:
[docs] def __init__(self, iter_obj): self.iter_obj = iter_obj
def __iter__(self): self.iter_obj.__iter__ def __next__(self): return tf.stack(next(self.iter_obj), axis=1)
[docs]def index_kg(kg_data): """ Index the Knowledge Graph data. Parameters ---------- kg_data : np.array KG data to be indexed Returns ------- dict metadata of KG <br> :code:`'ent2ind'`: dictionary that map entity to index :code:`'ind2ent'`: list that map index to entity :code:`'rel2ind'`: dictionary that map relation to index :code:`'ind2rel'`: list that map index to relation """ if isinstance(kg_data, np.ndarray): entities = list(np.unique(np.append(kg_data[:, 0], kg_data[:, 2]))) relations = list(np.unique(kg_data[:, 1])) else: entities = pd.Series([]) relations = pd.Series([]) filenames = os.listdir(kg_data) filenames = [kg_data + "/" + f for f in filenames] for f in filenames: tmp = pd.read_csv(f, header = None, dtype = str) entities = entities.append(tmp.iloc[:, 0]) entities = entities.append(tmp.iloc[:, 2]) relations = relations.append(tmp.iloc[:, 1]) entities = list(pd.unique(entities)) relations = list(pd.unique(relations)) ent2ind = {e: i for i, e in enumerate(entities)} ind2ent = [e for i, e in enumerate(entities)] rel2ind = {r: i for i, r in enumerate(relations)} ind2rel = [r for i, r in enumerate(relations)] return {"ent2ind": ent2ind, "ind2ent": ind2ent, "rel2ind": rel2ind, "ind2rel": ind2rel}
[docs]def convert_kg_to_index(kg_data, ent2ind, rel2ind): """ Convert the KG data into index Parameters ---------- kg_data : np.array KG data to be converted ent2ind : dict dictionary that map entity to index rel2ind : dict dictionary that map relation to index Returns ------- np.array indexed KG data """ if isinstance(kg_data, np.ndarray): h = list(map(ent2ind.get, list(kg_data[:, 0]))) r = list(map(rel2ind.get, list(kg_data[:, 1]))) t = list(map(ent2ind.get, list(kg_data[:, 2]))) return np.array([h,r,t]).T else: filenames = os.listdir(kg_data) check_path_exist_and_create(kg_data + "_indexed") for f in filenames: tmp = pd.read_csv(kg_data + "/" + f, header = None, dtype = str) tmp.iloc[:, 0] = tmp.iloc[:, 0].map(ent2ind) tmp.iloc[:, 1] = tmp.iloc[:, 1].map(rel2ind) tmp.iloc[:, 2] = tmp.iloc[:, 2].map(ent2ind) tmp.to_csv(kg_data + "_indexed/" + f, index=False, header=False) logging.info("indexed_kg has been save to %s" % kg_data+"_indexed")
[docs]def train_test_split_no_unseen(X, test_size, seed): """Split KG data into train and test Split KG data into train and test, this function guarantees that the entities in test data are also present in the train data. Parameters ---------- X : np.array KG data to be splitted test_size : int or float desired test size, if :code:`int`, represents the absolute test size, if :code:`float`, represents the relative proportion. seed : int random seed Returns ------- np.array, np.array splitted train, test KG data """ if isinstance(test_size, float): test_size = int(len(X) * test_size) e, e_cnt = np.unique(np.append(X[:, 0], X[:, 2]), return_counts = True) r, r_cnt = np.unique(X[:, 1], return_counts = True) e_dict = dict(zip(e, e_cnt)) r_dict = dict(zip(r, r_cnt)) test_id = np.array([], dtype=int) train_id = np.arange(len(X)) loop_count = 0 max_loop = len(X) * 10 rnd = np.random.RandomState(seed) pbar = tqdm(total=test_size, desc="test size", leave=True) while len(test_id) < test_size: i = rnd.choice(train_id) if e_dict[X[i, 0]] > 1 and r_dict[X[i, 1]] > 1 and e_dict[X[i, 2]] > 1: e_dict[X[i, 0]] -= 1 r_dict[X[i, 1]] -= 1 e_dict[X[i, 2]] -= 1 test_id = np.unique(np.append(test_id, i)) pbar.update(1) pbar.refresh() loop_count += 1 if loop_count == max_loop: logging.error("Cannot split a test set with desired size, please reduce the test size") return pbar.close() train_id = np.setdiff1d(train_id, test_id) return X[train_id], X[test_id]
[docs]def calculate_data_size(X): if isinstance(X, str): filenames = os.listdir(X) filenames = [X + "/" + f for f in filenames] total_size = 0 for f in filenames: partition = pd.read_csv(f, header=None) total_size += len(partition) return total_size else: return len(X)
[docs]def set_tf_iterator(data, batch_size, shuffle, buffer_size=None, seed=None): if isinstance(data, str): filenames = os.listdir(data) filenames = [data + "/" + f for f in filenames] tf_dataset = tf.data.Dataset.list_files(filenames) \ .interleave(lambda x: tf.data.experimental.CsvDataset(x, record_defaults=[tf.int32]*3), cycle_length=tf.data.experimental.AUTOTUNE) else: tf_dataset = tf.data.Dataset.from_tensor_slices(data) if shuffle: assert buffer_size is not None, "buffer_size must be given when shuffle is True" tf_dataset = tf_dataset.shuffle(buffer_size=buffer_size, seed=seed, reshuffle_each_iteration=True) tf_dataset = tf_dataset.repeat().batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE) iterator = iter(tf_dataset) if isinstance(data, str): iterator = myIter(iterator) return iterator