114-两台机器搭建TensorFlow集群- LinearRegression and SkipGram

DistributedTensorflow官方示例
TensorFlowRecommenders
TensorFlow分布式踩坑记
多值离散特征的embedding解决方案
[TensorFlow: A system for large-scale machine learning]
[Parameter Server for Distributed Machine Learning]

1. Running Distributed TensorFlow on Your Machine

代码test_distributed_tf.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import argparse
import sys
import numpy as np
import tensorflow as tf

FLAGS = None
BATCH_SIZE = 10
LEARNING_RATE = 0.1


def generate_samples():
TRUE_W = 1.0
TRUE_B = 2.0
NOISE_RATIO = 0.1
xs = np.random.uniform(low=-10.0, high=10.0, size=[BATCH_SIZE, 1])
noise = np.random.uniform(low=-1.0, high=1.0, size=[BATCH_SIZE, 1])
ys = xs * TRUE_W + TRUE_B + NOISE_RATIO * noise
return xs, ys


def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
ps_hosts = ["host_1_ip:2222", "host_1_ip:2223"]

worker_hosts = FLAGS.worker_hosts.split(",")
# worker_hosts = ["localhost:2223", "localhost:2224"]
worker_hosts = ["host_2_ip:2222", "host_2_ip:2223"]

# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

# Create and start a server for the local task.
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)

if FLAGS.job_name == "ps":
print "this is a ps server>>>>>>>>>>>>>>>>job_name={}, task_index={}".format(FLAGS.job_name, FLAGS.task_index)
server.join()
elif FLAGS.job_name == "worker":
print "this is a worker>>>>>>>>>>>>>>>>>>>job_name={}, task_index={}".format(FLAGS.job_name, FLAGS.task_index)
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# define a model
with tf.variable_scope("model") as scope:
x = tf.placeholder(tf.float32, shape=[None, 1], name="x")
y = tf.placeholder(tf.float32, shape=[None, 1], name="y")
w = tf.Variable(tf.constant(0.5), name="w")
b = tf.Variable(tf.constant(0.0), name="b")
loss = tf.pow(w * x + b - y, 2)

# use firefox instead of chrome to display tensorboard, chrome not cannot show the graphs
tf.summary.scalar("w", w)
tf.summary.scalar("b", b)
tf.summary.scalar("loss", tf.reduce_mean(loss))
summary_op = tf.summary.merge_all()
# saver=tf.train.Saver(max_to_keep=2, sharded=True, allow_empty=True)
scaffold = tf.train.Scaffold(summary_op=summary_op)

global_step = tf.train.get_or_create_global_step()

# The StopAtStepHook handles stopping after running given steps.
hooks = [tf.train.StopAtStepHook(last_step=1000)]

optimizer = tf.train.AdagradOptimizer(LEARNING_RATE)
# optimizer = tf.train.GradientDescentOptimizer(LEARNING_RATE)

isSyncTraining = False
if isSyncTraining:
# sync training
optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=2, total_num_replicas=2)
hooks.append(optimizer.make_session_run_hook(FLAGS.task_index == 0))
train_op = optimizer.minimize(loss, global_step=global_step)

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
scaffold=scaffold,
checkpoint_dir="/tmp/train_logs",
save_summaries_steps=10,
summary_dir="/tmp/summary_logs",
hooks=hooks) as session:

while not session.should_stop():
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
# mon_sess.run handles AbortedError in case of preempted PS.

xx, yy = generate_samples()
session.run(train_op, feed_dict={x: xx, y: yy})
lv, step, wv, bv = session.run([loss, global_step, w, b], feed_dict={x: xx, y: yy})
print("job_name={} task_index={}, step={}, w={}, b={}, loss={}"
.format(FLAGS.job_name, FLAGS.task_index, step, wv, bv, np.mean(lv.tolist())))


def parse_arguments():
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
# Flags for defining the tf.train.ClusterSpec
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
return parser.parse_known_args()


if __name__ == "__main__":
FLAGS, unparsed = parse_arguments()
print("FLAGS={}".format(FLAGS))
print("unparsed={}".format(unparsed))

tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

两台机器分别安装TensorFlow:
Mac : python2.7 TensorFlow1.13
Ubuntu: python2.7 TensorFlow2.2(使用兼容tf1.x模式执行代码)
注意在两台机器上创建/tmp/train_logs文件夹,防止权限问题导致运行失败

host_1机器上首先运行ps server 0

1
$ python test_distributed_tf.py --job_name=ps --task_index=0

host_1机器另开一个终端,运行ps server 1

1
$ python test_distributed_tf.py --job_name=ps --task_index=1

host_2机器运行worker0和worker1

1
2
$ python test_distributed_tf.py --job_name=worker --task_index=0
$ python test_distributed_tf.py --job_name=worker --task_index=1

Tensorboard展示训练指标(chrome浏览器显示可能会有问题,可以使用firefox)

1
$ tensorboard --logdir=/tmp/summary_logs


2. TensorFlow Distributed Example: skip-gram

skip-gram分布式训练模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import os

import tensorflow as tf
import numpy as np

from input_sample import *

embedding_size = 200
vocabulary_size = 10000
batch_size = 64
num_sampled = 32

def skip_gram_model(_):
job_name = ""
task_index = ""
cluster = tf.train.ClusterSpec(cluster_spec)
server = tf.train.Server(cluster, job_name=job_name, task_index=task_index)

if job_name == "ps":
print(">>>>>>>>>>skip_gram_embedding ps server keep waiting job_name_index={}-{}".format(job_name, task_index))
server.join()
elif job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
x_inputs = tf.placeholder(tf.int32, shape=[batch_size])
y_target = tf.placeholder(tf.int32, shape=[batch_size, 1])

embed = tf.nn.embedding_lookup(embeddings, x_inputs)
nce_weights = tf.Variable(tf.truncated_normal([vocabulary_size, embedding_size], stddev=1.0 / np.sqrt(embedding_size)))
nce_bias = tf.Variable(tf.zeros([vocabulary_size]))
loss = tf.reduce_mean(tf.nn.nce_loss(nce_weights, nce_bias, y_target, embed, num_sampled, vocabulary_size))

global_step = tf.train.get_or_create_global_step()
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.5).minimize(loss, global_step=global_step)

hooks = [tf.train.StopAtStepHook(last_step=20000)]

with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(task_index == 0),
checkpoint_dir="hdfs:///skip_gram/model",
hooks=hooks,
save_checkpoint_secs=None,
save_checkpoint_steps=20) as mon_sess:
train_step = 0

while not mon_sess.should_stop():
train_step += 1
batch_input, batch_labels = input_sample.generate_batch_data()
fd = {x_inputs: batch_input, y_target: batch_labels}

mon_sess.run(optimizer, feed_dict=fd)

if train_step % 100 == 0:
loss_val, step_val = mon_sess.run([loss, global_step], feed_dict=fd)