Source code for KGE.models.semantic_based.DistMult

"""An implementation of DistMult
"""

import numpy as np
import tensorflow as tf
from ..base_model.SemanticModel import SemanticModel
from ...loss import PairwiseHingeLoss
from ...ns_strategy import UniformStrategy
from ...constraint import normalized_embeddings, Lp_regularization

[docs]class DistMult(SemanticModel): """An implementation of DistMult from `[yang 2014] <https://arxiv.org/abs/1412.6575>`_. DistMult simplified RESCAL by restricting :math:`\\textbf{R}_i` to diagonal matrix. The score of :math:`(h,r,t)` is defined by a bilinear function: .. math:: f(h,r,t) = \\textbf{e}_h^{T} \\textbf{R}_{r} \\textbf{e}_t = \\textbf{e}_h^{T} diag(\\textbf{R}_{r}) \\textbf{e}_t = \sum_i (\\textbf{e}_h^{T})_i diag(\\textbf{R}_{r})_i (\\textbf{e}_t)_i where :math:`\\textbf{e}_i \in \mathbb{R}^k` are vector representations of the entities, and :math:`\\textbf{R}_i \in \mathbb{R}^{k \\times k}` is a diagonal matrix associated with the relation. If :code:`constraint=True` given in :py:func:`__init__`, renormalized :math:`\left\| \\textbf{e}_i \\right\|_2 = 1` to have unit length every iteration and conduct L2-regularization on :math:`\\textbf{R}` described in `original DistMult paper <https://arxiv.org/abs/1412.6575>`_: .. math:: regularization~term = \lambda \\times \sum_{i} {\left\| \\textbf{R}_i \\right\|}_F^2 = \lambda \\times \sum_i {\left\| diag(\\textbf{R}_i) \\right\|}_2^2 """
[docs] def __init__(self, embedding_params, negative_ratio, corrupt_side, loss_fn=PairwiseHingeLoss(margin=1), ns_strategy=UniformStrategy, constraint=True, constraint_weight=1.0, n_workers=1): """Initialized DistMult Parameters ---------- embedding_params : dict embedding dimension parameters, should have key :code:`'embedding_size'` for embedding dimension :math:`k` negative_ratio : int number of negative sample corrupt_side : str corrupt from which side while trainging, can be :code:`'h'`, :code:`'t'`, or :code:`'h+t'` loss_fn : class, optional loss function class :py:mod:`KGE.loss.Loss`, by default :py:mod:`KGE.loss.PairwiseHingeLoss` ns_strategy : function, optional negative sampling strategy, by default :py:func:`KGE.ns_strategy.uniform_strategy` constraint : bool, optional conduct constraint or not, by default :code:`True` constraint_weight : float, optional regularization weight :math:`\lambda`, by default 1.0 n_workers : int, optional number of workers for negative sampling, by default 1 """ super(DistMult, self).__init__(embedding_params, negative_ratio, corrupt_side, loss_fn, ns_strategy, n_workers) self.constraint = constraint self.constraint_weight = constraint_weight
def _init_embeddings(self, seed): """Initialized the DistMult embeddings. If :code:`model_weight_initial` not given in :py:func:`train`, initialized embeddings randomly, otherwise, initialized from :code:`model_weight_initial`. Parameters ---------- seed : int random seed """ if self._model_weights_initial is None: assert self.embedding_params.get("embedding_size") is not None, "'embedding_size' should be given in embedding_params when using RESCAL" limit = np.sqrt(6.0 / self.embedding_params["embedding_size"]) uniform_initializer = tf.initializers.RandomUniform(minval=-limit, maxval=limit, seed=seed) ent_emb = tf.Variable( uniform_initializer([len(self.metadata["ind2ent"]), self.embedding_params["embedding_size"]]), name="entities_embedding", dtype=np.float32 ) limit = np.sqrt(6.0 / self.embedding_params["embedding_size"]) uniform_initializer = tf.initializers.RandomUniform(minval=-limit, maxval=limit, seed=seed) rel_inter = tf.Variable( uniform_initializer([len(self.metadata["ind2rel"]), self.embedding_params["embedding_size"]]), name="relations_interaction", dtype=np.float32 ) self.model_weights = {"ent_emb": ent_emb, "rel_inter": rel_inter} else: self._check_model_weights(self._model_weights_initial) self.model_weights = self._model_weights_initial def _check_model_weights(self, model_weights): """Check the model_weights have necessary keys and dimensions. Parameters ---------- model_weights : dict model weights to check. """ assert model_weights.get("ent_emb") is not None, "entity embedding should be given in model_weights with key 'ent_emb'" assert model_weights.get("rel_inter") is not None, "relation interaction matrix should be given in model_weights with key 'rel_inter'" assert list(model_weights["ent_emb"].shape) == [len(self.metadata["ind2ent"]), self.embedding_params["embedding_size"]], \ "shape of 'ent_emb' should be (len(metadata['ind2ent']), embedding_params['embedding_size'])" assert list(model_weights["rel_inter"].shape) == [len(self.metadata["ind2rel"]), self.embedding_params["embedding_size"]], \ "shape of 'rel_inter' should be (len(metadata['ind2rel']), embedding_params['embedding_size'])"
[docs] def score_hrt(self, h, r, t): """ Score the triplets :math:`(h,r,t)`. If :code:`h` is :code:`None`, score all entities: :math:`(h_i, r, t)`. \n If :code:`t` is :code:`None`, score all entities: :math:`(h, r, t_i)`. \n :code:`h` and :code:`t` should not be :code:`None` simultaneously. Parameters ---------- h : tf.Tensor or np.ndarray or None index of heads with shape :code:`(n,)` r : tf.Tensor or np.ndarray index of relations with shape :code:`(n,)` t : tf.Tensor or np.ndarray or None index of tails with shape :code:`(n,)` Returns ------- tf.Tensor triplets scores with shape :code:`(n,)` """ h,r,t = super(DistMult, self).score_hrt(h,r,t) h_emb = tf.nn.embedding_lookup(self.model_weights["ent_emb"], h) t_emb = tf.nn.embedding_lookup(self.model_weights["ent_emb"], t) r_inter = tf.nn.embedding_lookup(self.model_weights["rel_inter"], r) return tf.reduce_sum(h_emb * r_inter * t_emb, axis=-1)
def _constraint_loss(self, X): """Perform constraint if necessary. Parameters ---------- X : batch_data batch data Returns ------- tf.Tensor regularization term with shape (1,) """ if self.constraint: self.model_weights["ent_emb"].assign(normalized_embeddings(X=self.model_weights["ent_emb"], p=2, axis=1, value=1)) r_inter = tf.nn.embedding_lookup(self.model_weights["rel_inter"], X[:, 1]) return self.constraint_weight * tf.reduce_mean(Lp_regularization(r_inter, p=2, axis=-1)) else: return 0