tensorflow联邦学习框架整理

louishao 2020-05-04

目录

High level api

custom model

定义模型需要继承自tff.learning.Model类,同时根据联邦学习的流程需要定义好,模型训练和辅助训练变量。变量必须要使用tf的类型,因为在实际环境中,联邦学习是要部署到移动端的,调用的不一定是python。

MnistVariables = collections.namedtuple(‘MnistVariables‘, ‘weights bias num_examples loss_sum accuracy_sum‘)  

# total variable
def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name=‘weights‘,
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name=‘bias‘,
          trainable=True),
      num_examples=tf.Variable(0.0, name=‘num_examples‘, trainable=False),
      loss_sum=tf.Variable(0.0, name=‘loss_sum‘, trainable=False),
      accuracy_sum=tf.Variable(0.0, name=‘accuracy_sum‘, trainable=False))

有了模型的变量之后定义模型的前向传播过程,注意在前向传播过程中variableloss等参数都进行了修改,同时定义ServerClient得到的数据。

def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch[‘x‘], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch[‘y‘], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch[‘y‘]), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions
def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

在从Client得到数据后,Server要做的就是对数据进行整合。这里metrics参数对应的是get_local_mnist_metrics的所有结果。tff是面向所有Client的,我理解的下面的操作都是从一个list dict中做加权平均,这里的metrics参数没有体现list。

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

有了上面的模型参数前向传播返回结果聚合结果后,定义模型。这个定义模型,我理解的是对一个Client的模型,上述的模型参数前向传播返回结果聚合结果都是针对Client而言的,猜测tff从一个Client到多个clients实现了一些包装,是这个过程没有体现在代码里。这里model要实现定义模型参数,可训练参数,不可训练参数,前向传播,本地变量和指定输入数据类型,汇报结果和结果整合。其中tff.learning.BatchOutputtff中封装输出结果的结构。

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch[‘x‘])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

建立好模型之后进行模型训练:

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))

state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print(‘round  1, metrics={}‘.format(metrics))

FC core

这部分是如何从底层创建联邦学习模型,首先要准备要了解一些概念。

数据类型

要显示的指出数据存储在C/S端,是全局唯一的还是多份拷贝的。注意的是使用print函数输出时,会将数据类型和值一起输出,成为compat notation

  • 与端无关的数据

    • 张量类型,tff.TensorType。需要指定它的元素数据类型dtype和形状shape
    • 列表类型,tff.SequenceType。其中的元素类型应当为TFF的tff.Type或者是能转换成tff.Type的东西。print打印列表类型数据时,会出现*表示列表。
    • 元组类型,tff.NamedTupleTypetff.NamedTupleType接受三种类型的输入:listtuplecollections.OrderedDictprint打印元组类型以<>作为标记。
    • 函数类型,tff.FunctionTypetff.FunctionType需要指定函数的输入类型,且只能有一个输入值,和一个函数返回值。
  • 与端有关的数据类型

    端有关的类型,主要完成两件任务:

    • 显式地定义数据值应该存放在C端还是S端(Placementtff.SERVERortff.CLIENTS
    • 定义这个数据是否全局一致(All equal
  • 联邦数据类型

    tff.FederatedType把上面提到的端无关类型包装起来,并增加Placementall_equal两个属性。其中all_equal可选,如果placement=tff.SERVER,则默认为True。使用print函数打印变量时,花括号{}表示非全局唯一,而没有花括号就表示全局唯一。

  • 变量声明

    定义变量类型后,声明变量使用tff.utils.create_variables(name, type),如

    OUR_TYPE = tff.TensorType(tf.int8, shape=[10])
    var = tff.utils.create_variables(‘var_name‘, OUR_TYPE)
    print(OUR_TYPE)
    print(var)

函数定义

  • 与端无关的函数

    函数需要使用tff.tf_computation(type)wrap up函数,其中type表示函数传入形参x的类型。

    @tff.tf_computation(tff.SequenceType(tf.int32))
    def add_up_integeres(x):
        return x.reduce(np.int32(0), lambda x, y: x+y)
  • 与端有关的函数

    与端有关的函数不仅需要指定类型,还需要指定Placement。装饰器也变为tff.federated_computation

    @tff.federated_computation(tff.FederatedType(tf.float32, tff.Clients))
    def get_average_temperature(sensor_readings):
        return tff.federated_mean(sensor_readings)
    
    print(get_average_temperature.type_signature)

逻辑回归实例

下面以逻辑回归为例,整理数据准备到模型训练的过程。跟上面的high-level api的明显区别是,从底层构建联邦学习要明确定义好,函数的输入输出类型。

  • 准备数据

    数据存放是长度为10的list->每个数字user个batch这样的格式。例如federated_train_data[5]表示就是数字都为5的batch list。

    import collections
    
    import numpy as np
    import tensorflow as tf
    import tensorflow_federated as tff
    
    tf.compat.v1.enable_v2_behavior()
    tff.framework.set_default_executor(tff.framework.ReferenceExecutor())
    
    mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
    
    NUM_EXAMPLES_PER_USER = 1000
    BATCH_SIZE = 50
    
    
    def get_data_for_digit(source, digit):
        output_sequence = []
        all_samples = [i for i, d in enumerate(source[1]) if d == digit]
        for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE):
            batch_samples = all_samples[i:i + BATCH_SIZE]
            output_sequence.append({
                ‘x‘:
                    np.array([source[0][i].flatten() / 255.0 for i in batch_samples],
                             dtype=np.float32),
                ‘y‘:
                    np.array([source[1][i] for i in batch_samples], dtype=np.int32)
            })
        return output_sequence
    
    
    federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)]
    
    federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]

    在整理好训练数据后,定义每个batch的数据类型

    BATCH_SPEC = collections.OrderedDict(
        x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
        y=tf.TensorSpec(shape=[None], dtype=tf.int32))
    BATCH_TYPE = tff.to_type(BATCH_SPEC)
    
    print(str(BATCH_TYPE))

参考资料

  1. Tensorflow Federated Tutorial: Federated Learning for Image Classification
  2. Tensorflow Federated Tutorial: Custom Federated Algorithms
  3. Zing22, Tensorflow Federated Framework 谷歌联邦学习框架:自底向上简明入门

相关推荐