維度、廣播操作與可視化:如何高效使用TensorFlow
一、Tensorflow 基礎
TensorFlow 和其他諸如 numpy 之類的數學計算庫的根本區別在于:在 TensorFlow 中,運算操作是符號化的。這是一個強大的思想,它能夠讓 TensorFlow 做任何事情(例如,自動求微分),而這些事情對于 numpy 等命令式的庫而言是不可能完成的。但是這也導致了隨之而來的代價,就是掌握這個庫會相對困難一些。在本文中,作者會嘗試揭開 TensorFlow 的神秘面紗,并提供一些關于高效使用 TensorFlow 的指南和實踐例子。
讓我們以一個簡單的例子開始,我們想讓兩個隨機矩陣相乘。首先我們看一下用 numpy 來實現這個例子:
- import numpy as np
- x = np.random.normal(size=[10, 10])
- y = np.random.normal(size=[10, 10])
- z = np.dot(x, y)print(z)
這樣的計算在 TensorFlow 中會是什么樣子?
結果如下:
- import tensorflow as tf
- x = tf.random_normal([10, 10])
- y = tf.random_normal([10, 10])
- z = tf.matmul(x, y)
- sess = tf.Session()
- z_val = sess.run(z)print(z_val)
與 numpy 直接執行計算并將結果復制到變量 z 中的思路不同的是,TensorFlow 僅僅給圖中代表結果的節點給提供了一個操作。如果我們直接打印 z 的值,我們會得到下面的信息:
- Tensor("MatMul:0", shape=(10, 10), dtype=float32)
由于兩個輸入矩陣都有被完全定義的維度,TensorFlow 還能夠在指定張量的維度的同時指定它的數據類型。為了計算出張量的值,我們需要使用 Session.run() 這個函數來創建一個會話。
Tip:在使用 Jupyter notebook 的時候,要確保在開始的時候調用一下 tf.reset_default() 函數,以在定義新節點之前清空符號圖。 |
為了理解符號計算有多么強大,讓我們來看一下另一個例子。假設我們有一些一條曲線上的樣本點(例如曲線是 f(x) = 5x^2 + 3),但是我們想要在不知道參數的情況下來估計這個函數 f(x)。我們定義一個含參數的函數 g(x, w) = w0 x^2 + w1 x + w2,它是關于輸入數據 x 和隱藏參數 w 的函數,我們的目標就是找到這組隱藏參數,使得 g(x, w) ≈ f(x)。我們可以通過最小化下面的損失函數 L(w) 來實現:L(w) = (f(x) - g(x, w))^2。盡管這個簡單的問題已經有一個閉合的解決方法了,但我們還是選擇使用一個更加通用的方法,這個方法能夠被應用在任何可微分的函數中,它使用了隨機梯度降的方法。我們簡單地計算損失函數 L(w) 在一組樣本點上關于 w 的平均梯度,然后朝著梯度的反方向變化參數 w。
下面展示了這個方法在 TensorFlow 中是如何實現的:
- import numpy as np
- import tensorflow as tf
- # 使用占位符從python向TensorFlow運算符中傳遞參數值。我們在這里定義了兩個占位符,其中一個用來存放輸入特征x,另一個用來存放輸出y.
- x = tf.placeholder(tf.float32)
- y = tf.placeholder(tf.float32)
- # 假設我們已經知道了期望的函數是一個二次多項式,我們就會分配一個具有3個元素的向量來代表這些參數。這些變量會被以隨機地進行初始化。
- w = tf.get_variable("w", shape=[3, 1])
- # 我們定義yhat為我們對y的估計值
- f = tf.stack([tf.square(x), x, tf.ones_like(x)], 1)
- yhat = tf.squeeze(tf.matmul(f, w), 1)
- # 損失函數被定義為y的估計值和真實值之間地l2距離。我們還附加了一個收縮項,以確保結果得到的權值會比較小。
- loss = tf.nn.l2_loss(yhat - y) + 0.1 * tf.nn.l2_loss(w)
- # 我們以0.1的學習率使用Adam優化器來最小化損失函數。
- train_op = tf.train.AdamOptimizer(0.1).minimize(loss)
- def generate_data():
- x_val = np.random.uniform(-10.0, 10.0, size=100)
- y_val = 5 * np.square(x_val) + 3
- return x_val, y_val
- sess = tf.Session()
- # 因為我們要使用這些變量,所以我們需要先將它們初始化
- sess.run(tf.global_variables_initializer())
- for _ in range(1000):
- x_val, y_val = generate_data()
- _, loss_val = sess.run([train_op, loss], {x: x_val, y: y_val})
- print(loss_val)
- print(sess.run([w]))
運行完這段代碼之后,我得到的參數結果是:
- [4.98605919,-0.00187828875e-04,3.8395009]
上面是編輯運行完之后的結果,它對應的損失值是 17.6175. 每一次的具體結果都會不同,但是最終結果都很接近期望的函數值。下面是原文作者提供的值。
- [4.9924135,0.00040895029, 3.4504161]
這是與期望參數相當接近的近似。
這只是 TensorFlow 能夠做到的事情的冰山一角而已。很多類似于優化具有上百萬個參數的大型神經網絡的問題都能夠用 TensorFlow 以很少量的代碼來高效地實現。與此同時,TensorFlow 的開發團隊還致力于在多種設備、多線程以及支持多平臺等問題上更進一步。
簡單起見,在絕大多數例子中我們都手動地創建了會話,我們并沒有保存和加載 checkpoint,但是這卻是我們在實戰中經常需要做的事情。你很可能想著使用估計 API 來進行會話管理以及做日志。我們在 code/framework 路徑下提供了一個簡單的可擴展架構,作為使用 TensorFlow 來訓練神經網絡的一個實際架構的例子。
理解靜態維度和動態維度
TensorFlow 中的張量具有靜態維度的屬性,它在構建圖的時候就被確定好了。靜態維度也有可能是不確定的。舉個例子,我們也許會定義一個維度為 [None,128] 的張量。
- import tensorflow as tf
- a = tf.placeholder([None, 128])
這意味著***個維度可以是任意大小,會在 Session.run() 的過程中被動態地決定。在表現靜態張量的時候,TensorFlow 有著相當丑的 API:
- static_shape = a.get_shape().as_list() # returns [None, 128]
(這個應該寫成 a,shape() 的形式,但是這里有人把它定義得太不方便了。)
為獲得張量的動態形式,你可以調用 tf.shape 功能,它會返回一個表示給定張量的形狀的張量:
- dynamic_shape = tf.shape(a)
一個張量的靜態維度可以使用 Tensor.set_shape() 函數來進行設置。
- a.set_shape([32, 128])
僅當你知道自己在做什么的時候再使用這個函數,事實上使用 tf.reshape() 會更加安全。
- a = tf.reshape(a, [32, 128])
如果有一個函數能在方便的時候返回靜態維度,在可用的時候返回動態維度,那將會很方便。下面就定義了這樣一個函數:
- def get_shape(tensor):
- static_shape = tensor.get_shape().as_list()
- dynamic_shape = tf.unstack(tf.shape(tensor))
- dims = [s[1] if s[0] is None else s[0]
- for s in zip(static_shape, dynamic_shape)]
- return dims
現在設想:我們想通過折疊第二維和第三維來把一個 3 維的矩陣轉換成一個 2 維的矩陣。我們可以使用上述的 get_shape() 函數來完成這件事:
- b = placeholder([None, 10, 32])
- shape = get_shape(tensor)
- b = tf.reshape(b, [shape[0], shape[1] * shape[2]])
值得注意的是,這里無論矩陣是不是靜態的,這個方法都能奏效。
事實上我們可以寫一個更加具有通用目標的函數來折疊任何幾個維度:
- import tensorflow as tfimport numpy as np
- def reshape(tensor, dims_list):
- shape = get_shape(tensor)
- dims_prod = []
- for dims in dims_list:
- if isinstance(dims, int):
- dims_prod.append(shape[dims])
- elif all([isinstance(shape[d], int) for d in dims]):
- dims_prod.append(np.prod([shape[d] for d in dims]))
- else:
- dims_prod.append(tf.prod([shape[d] for d in dims]))
- tensor = tf.reshape(tensor, dims_prod)
- return tensor
然后折疊第二個維度就變得非常容易了。
- b = placeholder([None, 10, 32])
- b = tf.reshape(b, [0, [1, 2]])
廣播操作
TensorFlow 支持廣播逐個元素的操作。正常情況下,當你想執行類似于加法和乘法的操作時,你需要確保算子的維度是匹配的。例如,你不能把一個維度為 [3,2] 的張量與一個維度為 [3,4] 的張量相加。但是在一個特殊的情況下你可以使用異常的維度。TensorFlow 會隱式地把一個張量的異常維度調整到與另一個算子相匹配的維度以實現維度兼容。所以將一個維度為 [3,2] 的張量與一個維度為 [3,1] 的張量相加是合法的。
- import tensorflow as tf
- a = tf.constant([[1., 2.], [3., 4.]])
- b = tf.constant([[1.], [2.]])# c = a + tf.tile(a, [1, 2])c = a +
廣播允許我們執行隱式調整,這能夠讓代碼更短,更加高效地使用內存,因為我們不需要存儲調整操作中間結果的內存開銷。這個方法可以被用在一個場景中:那就是結合不同長度的特征。為了連接不同長度的特征,我們通常會把輸入張量進行調整,然后把結果連接起來并應用一些非線性處理方法。這是很多神經網絡中的常用方法。
- a = tf.random_uniform([5, 3, 5])
- b = tf.random_uniform([5, 1, 6])
- # concat a and b and apply nonlinearity
- tiled_b = tf.tile(b, [1, 3, 1])
- c = tf.concat([a, tiled_b], 2)
- d = tf.layers.dense(c, 10, activation=tf.nn.relu)
但是這個可以用廣播的方法做得更加有效。我們可以利用 f(m(x + y)) 等價于 f(mx + my) 這一事實。所以我們可以將線性操作分開處理,然后使用廣播的方法去做隱式的連接。
- pa = tf.layers.dense(a, 10, activation=None)
- pb = tf.layers.dense(b, 10, activation=None)
- d = tf.nn.relu(pa + pb)
事實上這段代碼是相當通用的,只要張量之間能夠進行廣播操作,它就能夠被用于任何維度的張量上。
- def tile_concat_dense(a, b, units, activation=tf.nn.relu):
- pa = tf.layers.dense(a, units, activation=None)
- pb = tf.layers.dense(b, units, activation=None)
- c = pa + pb if activation is not None:
- c = activation(c)
- return c
到目前為止,我們討論了廣播操作的好的一面。你會問,那么它不好的一面是什么呢?隱式的假設總會讓調試變得更加難。看一下下面的例子:
- a = tf.constant([[1.], [2.]])
- b = tf.constant([1., 2.])
- c = tf.reduce_sum(a + b)
你認為 c 的值會是多少呢?如果你說是 6,那你就錯了。結果會是 12。這是因為當兩個張量的秩不匹配的時候,TensorFlow 就會自動地以較低的維度來擴展***維的大小,所以加法的結果會變成 [[2,3],[3,4]],所以在全體參數上的求和操作會給出 12 的結果。
避免這個問題的辦法就是盡可能地顯示化。如果我們顯示地指定了要將哪個維度進行求和,解決這個問題就會變得很容易了。
- a = tf.constant([[1.], [2.]])
- b = tf.constant([1., 2.])
- c = tf.reduce_sum(a + b, 0)
現在 c 的值會是 [5,7],考慮到輸出結果的維度,我們會立即猜想是不是哪里出了錯。一般的經驗法則就是在求和操作以及使用 tf.squeeze() 的時候總要指定具體的維度。
原型內核與 Python 操作下的高度可視化
為了更高的效率,TensorFlow 的運算內核是用 C++編寫的。但是用 C++寫 TensorFlow 內核是一件痛苦的事情。所以,在你實現內核之前,你也許會想著快速地實現一個原型系統。借助于 tf.py_func() 函數,你可以將任何一段 Python 代碼轉化成 TensorFlow 操作。
例如,下面的例子展示了如何在 TensorFlow 中使用 Python 操作來實現一個簡單的 ReLU 非線性核。
- import numpy as np
- import tensorflow as tf
- import uuiddef relu(inputs):
- # Define the op in python
- def _relu(x):
- return np.maximum(x, 0.)
- # Define the op's gradient in python
- def _relu_grad(x): return np.float32(x > 0)
- # An adapter that defines a gradient op compatible with Tensorflow
- def _relu_grad_op(op, grad):
- x = op.inputs[0]
- x_grad = grad * tf.py_func(_relu_grad, [x], tf.float32)
- return x_grad
- # Register the gradient with a unique id
- grad_name = "MyReluGrad_" + str(uuid.uuid4())
- tf.RegisterGradient(grad_name)(_relu_grad_op)
- # Override the gradient of the custom op
- g = tf.get_default_graph()
- with g.gradient_override_map({"PyFunc": grad_name}):
- output = tf.py_func(_relu, [inputs], tf.float32)
- return output
你可以使用 TensorFlow 的梯度檢查器來驗證梯度是否正確:
- x = tf.random_normal([10])
- y = relu(x * x)with tf.Session():
- diff = tf.test.compute_gradient_error(x, [10], y, [10])
- print(diff)
函數 compute_gradient_error() 會計算出梯度的數值,并且返回與給定梯度相比的差別。我所期望的是一個很小的差距。
需要注意的是,這個實現是相當低效的,并且僅對原型開發有用,因為 Python 代碼并不是能夠并行的,也無法在 GPU 上運行。一旦你驗證了自己的思想,你肯定會想著把它寫成一個 c++內核。
在實踐中,我們通常會在 Tensorboard 上使用 Python 操作來實現可視化。假設你在構建一個圖像分類的模型,并且想要在訓練的過程中可視化模型的預測結果。TensorFlow 允許使用 tf.summary.image() 函數來做可視化。
- image = tf.placeholder(tf.float32)
- tf.summary.image("image", image)
但是這僅僅會可視化輸入圖像。為了可視化預測結果,你必須尋求一種能夠做圖像注解的方式,這種方式幾乎在現有的操作中根本就不存在。一種比較容易的方法就是在 Python 中畫圖,然后用 Python 操作將其封裝起來。
- import io
- import matplotlib.pyplot as plt
- import numpy as npimport PIL
- import tensorflow as tf
- def visualize_labeled_images(images, labels, max_outputs=3, name='image'):
- def _visualize_image(image, label):
- # Do the actual drawing in python
- fig = plt.figure(figsize=(3, 3), dpi=80)
- ax = fig.add_subplot(111)
- ax.imshow(image[::-1,...])
- ax.text(0, 0, str(label),
- horizontalalignment='left',
- verticalalignment='top')
- fig.canvas.draw()
- # Write the plot as a memory file.
- buf = io.BytesIO()
- data = fig.savefig(buf, format='png')
- buf.seek(0)
- # Read the image and convert to numpy array
- img = PIL.Image.open(buf)
- return np.array(img.getdata()).reshape(img.size[0], img.size[1], -1)
- def _visualize_images(images, labels):
- # Only display the given number of examples in the batch
- outputs = []
- for i in range(max_outputs):
- output = _visualize_image(images[i], labels[i])
- outputs.append(output)
- return np.array(outputs, dtype=np.uint8)
- # Run the python op.
- figs = tf.py_func(_visualize_images, [images, labels], tf.uint8)
- return tf.summary.image(name, figs)
要注意,因為這里的 summary 通常都會隔一段時間才評估一次(并不是每一步都有),所以這個方法是實用的,而不用擔心由此引發的效率問題。
原文:https://github.com/vahidk/EffectiveTensorflow
【本文是51CTO專欄機構“機器之心”的原創譯文,微信公眾號“機器之心( id: almosthuman2014)”】