【实践】广告ctr模型之Deep cross network (dcn)


广告ctr模型可用的深度模型其本质架构都一样(可见https://blog.csdn.net/dengxing1234/article/details/79916532),这也是限制了模型的发展路线。Deep cross network在广告ctr模型也是应用很常见,它聚焦于解决特征工程的问题,对比paper中提到:【DNN可以自动地学习特征地交互作用,然而,它们隐式地的生成所有的特征交互,这对于学习所有类型的交叉特征不一定有效。于是提出了一种能够保持深度神经网络良好收益的深度交叉网络(DCN),除此之外,它还引入了一个新的交叉网络,更有效地学习在一定限度下的特征相互作用,更有甚,DCN在每一层确切地应用交叉特征而不需要人工特征工程,这相比于DNN模型增加地额外地复杂度可以忽略不计】。自己按照paper和高级的tensorflow api,实现l一版dcn,源码文件3个都已共享。希望有问题各位同行人能指出交流。

原版paper:https://arxiv.org/abs/1708.05123

 


my_core.py

from tensorflow.python.framework import tensor_shape
from tensorflow.python.layers import base
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import standard_ops
from tensorflow.python.framework import ops

class CrossLayer(base.Layer):
    def __init__(self,
                 use_bias=True,
                 kernel_initializer=None,
                 bias_initializer=init_ops.zeros_initializer(),
                 kernel_regularizer=None,
                 bias_regularizer=None,
                 activity_regularizer=None,
                 kernel_constraint=None,
                 bias_constraint=None,
                 trainable=True,
                 name=None,
                 **kwargs):
        super(CrossLayer, self).__init__(trainable=trainable, name=name,
                                activity_regularizer=activity_regularizer,
                                **kwargs)

        self.use_bias = use_bias
        self.kernel_initializer = kernel_initializer
        self.bias_initializer = bias_initializer
        self.kernel_regularizer = kernel_regularizer
        self.bias_regularizer = bias_regularizer
        self.kernel_constraint = kernel_constraint
        self.bias_constraint = bias_constraint
        self.input_spec = base.InputSpec(min_ndim=2)

    def build(self, input_shape):
        input_shape = tensor_shape.TensorShape(input_shape)
        if input_shape[-1].value is None:
            raise ValueError('The last dimension of the inputs to `DCN` ''should be defined. Found `None`.')
        self.dim = input_shape[-1].value
        self.input_spec = base.InputSpec(min_ndim=2,
                                         axes={-1: self.dim})

        self.kernel = self.add_variable('kernel',
                                        shape=[1, self.dim],
                                        initializer=self.kernel_initializer,
                                        regularizer=self.kernel_regularizer,
                                        constraint=self.kernel_constraint,
                                        dtype=self.dtype,
                                        trainable=True)
        if self.use_bias:
            self.bias = self.add_variable('bias',
                                          shape=[1,self.dim],
                                          initializer=self.bias_initializer,
                                          regularizer=self.bias_regularizer,
                                          constraint=self.bias_constraint,
                                          dtype=self.dtype,
                                          trainable=True)
        else:
            self.bias = None
        self.built = True


    def call(self, inputs, **kwargs):
        x_0 = kwargs["x_0"]
        x_l = ops.convert_to_tensor(inputs, dtype=self.dtype)
        scalar = standard_ops.matmul(x_l, self.kernel, transpose_b=True)
        dot_ = standard_ops.multiply(x_0, scalar)
        dot_ = standard_ops.add(dot_, self.bias)
        return standard_ops.add(dot_, x_l)

    def compute_output_shape(self, input_shape):
        input_shape = tensor_shape.TensorShape(input_shape)
        input_shape = input_shape.with_rank_at_least(2)
        if input_shape[-1].value is None:
            raise ValueError(
                'The innermost dimension of input_shape must be defined, but saw: %s'
                % input_shape)
        return input_shape


def cross_layer(
    inputs,
    x_0,
    use_bias=True,
    kernel_initializer=init_ops.truncated_normal_initializer(),
    bias_initializer=init_ops.zeros_initializer(),
    kernel_regularizer=None,
    bias_regularizer=None,
    activity_regularizer=None,
    kernel_constraint=None,
    bias_constraint=None,
    trainable=True,
    name=None,
    reuse=None
):
  """Functional interface for the dense cross layer."""
  layer = CrossLayer(
                use_bias=use_bias,
                kernel_initializer=kernel_initializer,
                bias_initializer=bias_initializer,
                kernel_regularizer=kernel_regularizer,
                bias_regularizer=bias_regularizer,
                activity_regularizer=activity_regularizer,
                kernel_constraint=kernel_constraint,
                bias_constraint=bias_constraint,
                trainable=trainable,
                name=name,
                dtype=inputs.dtype.base_dtype,
                _reuse=reuse)
  return layer.apply(inputs, x_0=x_0)



dcn.py
# -*- coding:utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import six
from tensorflow.python.estimator import estimator
from tensorflow.python.estimator import model_fn
from tensorflow.python.estimator.canned import head as head_lib
from tensorflow.python.estimator.canned import optimizers
from tensorflow.python.feature_column import feature_column as feature_column_lib
from tensorflow.python.layers import core as core_layers
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import nn
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.ops import variable_scope
from tensorflow.python.summary import summary
from tensorflow.python.training import training_util

import tensorflow as tf
from model_extension import my_core

_LEARNING_RATE = 0.005

def _add_hidden_layer_summary(value, tag):
  summary.scalar('%s/fraction_of_zero_values' % tag, nn.zero_fraction(value))
  summary.histogram('%s/activation' % tag, value)


def _dcn_logit_fn_builder(units,
                          hidden_units,
                          feature_columns,
                          activation_fn,
                          dropout,
                          num_cross_layers,
                          input_layer_partitioner):
  """Function builder for a dcn logit_fn"""
  if not (isinstance(units, int) or isinstance(units, list)):
    raise ValueError('units must be an int or list.  Given type: {}'.format(
        type(units)))


  def dcn_logit_fn(features, mode):

    with variable_scope.variable_scope(
        'input_from_feature_columns',
        values=tuple(six.itervalues(features)),
        partitioner=input_layer_partitioner):
      x0 = feature_column_lib.input_layer(
          features=features, feature_columns=feature_columns)
      deep_net = x0
      cross_net = x0
    for layer_id, num_hidden_units in enumerate(hidden_units):
      with variable_scope.variable_scope(
          'hiddenlayer_%d' % layer_id, values=(deep_net,)) as hidden_layer_scope:
        deep_net = core_layers.dense(
            deep_net,
            units=num_hidden_units,
            activation=activation_fn,
            kernel_initializer=init_ops.glorot_uniform_initializer(),
            name=hidden_layer_scope)
        if dropout is not None and mode == model_fn.ModeKeys.TRAIN:
          cross_net = core_layers.dropout(deep_net, rate=dropout, training=True)
      _add_hidden_layer_summary(deep_net, hidden_layer_scope.name)

    if num_cross_layers == 0:
        last_layer = deep_net
    else:
        for layer_id in range(num_cross_layers):
            with variable_scope.variable_scope(
                    'crosslayer_%d' % layer_id, values=(cross_net,)) as cross_layer_scope:
                cross_net = my_core.cross_layer(
                    cross_net,
                    x0,
                    name=cross_layer_scope,
                    )
            _add_hidden_layer_summary(cross_net, cross_layer_scope.name)
        last_layer = tf.concat([cross_net,deep_net], 1)


    if isinstance(units, int):
      with variable_scope.variable_scope(
          'logits', values=(last_layer,)) as logits_scope:
        logits = core_layers.dense(
            last_layer,
            units=units,
            activation=None,
            kernel_initializer=init_ops.glorot_uniform_initializer(),
            name=logits_scope)
      _add_hidden_layer_summary(logits, logits_scope.name)
    else:
      logits = []
      for head_index, logits_dimension in enumerate(units):
        with variable_scope.variable_scope(
            'logits_head_{}'.format(head_index), values=(last_layer,)) as logits_scope:
          these_logits = core_layers.dense(
              last_layer,
              units=logits_dimension,
              activation=None,
              kernel_initializer=init_ops.glorot_uniform_initializer(),
              name=logits_scope)
        _add_hidden_layer_summary(these_logits, logits_scope.name)
        logits.append(these_logits)
    return logits

  return dcn_logit_fn


def _dcn_model_fn(
        features,
        labels,
        mode,
        head,
        hidden_units,
        feature_columns,
        optimizer='Adagrad',
        activation_fn=nn.relu,
        dropout=None,
        num_cross_layers = None,
        input_layer_partitioner=None, config=None):
  """Deep Neural Net model_fn"""
  if not isinstance(features, dict):
    raise ValueError('features should be a dictionary of `Tensor`s. '
                     'Given type: {}'.format(type(features)))
  optimizer = optimizers.get_optimizer_instance(
      optimizer, learning_rate=_LEARNING_RATE)
  num_ps_replicas = config.num_ps_replicas if config else 0

  partitioner = partitioned_variables.min_max_variable_partitioner(
      max_partitions=num_ps_replicas)
  with variable_scope.variable_scope(
      'dcn',
      values=tuple(six.itervalues(features)),
      partitioner=partitioner):
    input_layer_partitioner = input_layer_partitioner or (
        partitioned_variables.min_max_variable_partitioner(
            max_partitions=num_ps_replicas,
            min_slice_size=64 << 20))

    logit_fn = _dcn_logit_fn_builder(
        units=head.logits_dimension,
        hidden_units=hidden_units,
        feature_columns=feature_columns,
        activation_fn=activation_fn,
        dropout=dropout,
        num_cross_layers = num_cross_layers,
        input_layer_partitioner=input_layer_partitioner)
    logits = logit_fn(features=features, mode=mode)

    def _train_op_fn(loss):
      """Returns the op to optimize the loss."""
      return optimizer.minimize(
          loss,
          global_step=training_util.get_global_step())

    return head.create_estimator_spec(
        features=features,
        mode=mode,
        labels=labels,
        train_op_fn=_train_op_fn,
        logits=logits)


class DCNClassifier(estimator.Estimator):

  def __init__(self,
               hidden_units,
               feature_columns,
               model_dir=None,
               n_classes=2,
               weight_column=None,
               label_vocabulary=None,
               optimizer='Adagrad',
               activation_fn=nn.relu,
               dropout=None,
               num_cross_layers=0,
               input_layer_partitioner=None,
               config=None):

    if n_classes == 2:
      head = head_lib._binary_logistic_head_with_sigmoid_cross_entropy_loss(  # pylint: disable=protected-access
          weight_column=weight_column,
          label_vocabulary=label_vocabulary)
    else:
      head = head_lib._multi_class_head_with_softmax_cross_entropy_loss(  # pylint: disable=protected-access
          n_classes, weight_column=weight_column,
          label_vocabulary=label_vocabulary)
    def _model_fn(features, labels, mode, config):
      return _dcn_model_fn(
          features=features,
          labels=labels,
          mode=mode,
          head=head,
          hidden_units=hidden_units,
          feature_columns=tuple(feature_columns or []),
          optimizer=optimizer,
          activation_fn=activation_fn,
          dropout=dropout,
          num_cross_layers=num_cross_layers,
          input_layer_partitioner=input_layer_partitioner,
          config=config)
    super(DCNClassifier, self).__init__(
        model_fn=_model_fn, model_dir=model_dir, config=config)


deep_cross.py

# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import shutil
import sys
from model_extension.dcn import DCNClassifier
import tensorflow as tf


_CSV_COLUMNS_I = [("I"+str(x)) for x in range(13)]
_CSV_COLUMNS_C = [("C"+str(x)) for x in range(26)]
_CSV_COLUMNS = _CSV_COLUMNS_I + _CSV_COLUMNS_C
_CSV_COLUMNS.insert(0,"label")

_CSV_COLUMN_DEFAULTS_I=[[0.0]for x in range(13)]
_CSV_COLUMN_DEFAULTS_C=[['']for x in range(26)]
_CSV_COLUMN_DEFAULTS = _CSV_COLUMN_DEFAULTS_I + _CSV_COLUMN_DEFAULTS_C
_CSV_COLUMN_DEFAULTS.insert(0,[0])

parser = argparse.ArgumentParser()

parser.add_argument(
    '--model_dir', type=str, default='../tmp/criteo_model/deep_cross',
    help='Base directory for the model.')

parser.add_argument(
    '--model_type', type=str, default='deep_cross',
    help="Valid model types: {'deep_cross'}.")

parser.add_argument(
    '--train_epochs', type=int, default=40, help='Number of training epochs.')

parser.add_argument(
    '--epochs_per_eval', type=int, default=2,
    help='The number of training epochs to run between evaluations.')

parser.add_argument(
    '--batch_size', type=int, default=40, help='Number of examples per batch.')

parser.add_argument(
    '--train_data', type=str, default='../data/criteo_data/criteo_train.txt',
    help='Path to the training data.')

parser.add_argument(
    '--test_data', type=str, default='../data/criteo_data/criteo_test.txt',
    help='Path to the test data.')

_NUM_EXAMPLES = {
    'train': 20000000,
    'validation': 10000000,
}


def build_model_columns():
    """Builds a set of deep feature columns."""
    numeric_feature = [tf.feature_column.numeric_column('I'+str(x)) for x in range(13)]

    categorical_feature = [tf.feature_column.categorical_column_with_hash_bucket(
        'C'+str(x), hash_bucket_size=1000) for x in range(26)]

    embedding_feature = [tf.feature_column.embedding_column(x, dimension=8) for x in categorical_feature]

    deep_columns = embedding_feature + numeric_feature

    return deep_columns


def build_estimator(model_dir, model_type):
    """Build an estimator appropriate for the given model type."""
    deep_columns = build_model_columns()



    hidden_units = [2]

    # Create a tf.estimator.RunConfig to ensure the model is run on CPU, which
    # trains faster than GPU for this model.
    run_config = tf.estimator.RunConfig().replace(
        session_config=tf.ConfigProto(device_count={'GPU': 0}))

    if model_type == 'deep_cross':
        return DCNClassifier(
            model_dir=model_dir,
            hidden_units=hidden_units,
            feature_columns=deep_columns,
            optimizer=tf.train.ProximalAdagradOptimizer(
            learning_rate=0.001,
            l2_regularization_strength=0.001
            ),
            num_cross_layers=2,
            config=run_config)


def input_fn(data_file, num_epochs, shuffle, batch_size):
    """Generate an input function for the Estimator."""
    assert tf.gfile.Exists(data_file), (
            '%s not found. Please make sure you have either run data_download.py or '
            'set both arguments --train_data and --test_data.' % data_file)

    def parse_csv(value):
        print('Parsing', data_file)
        columns = tf.decode_csv(value, record_defaults=_CSV_COLUMN_DEFAULTS)

        features = dict(zip(_CSV_COLUMNS, columns))
        labels = features.pop('label')
        return features, labels

    # Extract lines from input files using the Dataset API.
    dataset = tf.data.TextLineDataset(data_file)

    if shuffle:
        dataset = dataset.shuffle(buffer_size=_NUM_EXAMPLES['train'])

    dataset = dataset.map(parse_csv, num_parallel_calls=5)

    # We call repeat after shuffling, rather than before, to prevent separate
    # epochs from blending together.
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)

    iterator = dataset.make_one_shot_iterator()
    features, labels = iterator.get_next()
    return features, labels


def main(unused_argv):
    # Clean up the model directory if present
    shutil.rmtree(FLAGS.model_dir, ignore_errors=True)
    model = build_estimator(FLAGS.model_dir, FLAGS.model_type)


    train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(
            FLAGS.train_data, 2, True, 500),max_steps=8000)
    eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(
            FLAGS.test_data, 1, False, 500),steps=2000)
    tf.estimator.train_and_evaluate(model, train_spec, eval_spec)


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    FLAGS, unparsed = parser.parse_known_args()
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)