If you have not heard of TensorFlow, please teach me the ways of the yedi! From the project website:
TensorFlow is an end-to-end open source platform for machine learning. It has a comprehensive, flexible ecosystem of tools, libraries and community resources that lets researchers push the state-of-the-art in ML and developers easily build and deploy ML powered applications.
People on the street import tensorflow as tf
, and so will the code below. Frankly, I am not a machine learning engineer, but thought it would be cool to download the new Apache Kafka®-IO wrappers for producing and consuming tensorflow native record format straight from Kafka without intermediaries and check if they work with Redpanda... and of course it worked first try :D
Install redpanda
In this tutorial, I'll only cover the ubuntu install, but head over to our github repo, give us a star and see download instructions for your operating system.
curl -1sLf \
'https://packages.vectorized.io/nzc4ZYQK3WRGd9sy/redpanda/cfg/setup/bash.deb.sh' \
| sudo -E bash
sudo apt-get install redpanda && sudo systemctl start redpanda
sudo apt-get install redpanda && sudo systemctl start redpanda
Install deps on your machine
pip3 install --user kafka-python tensorflow-io sklearn pandas
Copy this gist
I copied this gist from the main github repo of the new tensorflow/io organization: https://github.com/tensorflow/io
#Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
COLUMNS = [
# labels
'class',
# low-level features
'lepton_1_pT',
'lepton_1_eta',
'lepton_1_phi',
'lepton_2_pT',
'lepton_2_eta',
'lepton_2_phi',
'missing_energy_magnitude',
'missing_energy_phi',
# high-level derived features
'MET_rel',
'axial_MET',
'M_R',
'M_TR_2',
'R',
'MT2',
'S_R',
'M_Delta_R',
'dPhi_r_b',
'cos(theta_r1)'
]
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
def decode_kafka_item(item):
message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(item.key)
return (message, key)
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print(model.summary())
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["susy-test"],
group_id="testcg",
servers="127.0.0.1:9092",
stream_timeout=10000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def decode_kafka_test_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["susy-train"],
group_id="cgonline",
servers="127.0.0.1:9092",
stream_timeout=30000, # in milliseconds, to block indefinitely, set it to -1.
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka_after_sleep(topic_name, items):
time.sleep(30)
print("#"*100)
print("Writing messages into topic: {0} after a nice sleep !".format(topic_name))
print("#"*100)
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name,
key=key.encode('utf-8'),
value=message.encode('utf-8')
).add_errback(error_callback)
count+=1
producer.flush()
print("#"*100)
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
print("#"*100)
def decode_kafka_online_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
thread = threading.Thread(target=write_to_kafka_after_sleep,
args=("susy-train", zip(x_train, y_train)))
thread.daemon = True
thread.start()
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
model.fit(mini_ds, epochs=3)
Download the data
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
Profit!
I had to install the cuda libs to get my GPU humming. However, after a quick install of that I was up and running!
❯ python3 main.py
2021-01-11 22:23:02.910794: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
tensorflow-io version: 0.17.0
tensorflow version: 2.4.0
Number of training samples: 60000
Number of testing sample: 40000
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test
2021-01-11 22:23:10.847745: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA
2021-01-11 22:23:10.902048: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-11 22:23:10.902534: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-11 22:23:10.928441: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-11 22:23:10.928471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: darktower
2021-01-11 22:23:10.928483: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: darktower
2021-01-11 22:23:10.928573: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 460.27.4
2021-01-11 22:23:10.928598: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 455.28.0
2021-01-11 22:23:10.928608: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 455.28.0 does not match DSO version 460.27.4 -- cannot find working devices in this configuration
2021-01-11 22:23:10.931185: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-11 22:23:11.935134: I tensorflow_io/core/kernels/kafka_kernels.cc:349] Kafka tail: 59449
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense (Dense) (None, 128) 2432
_________________________________________________________________
dropout (Dropout) (None, 128) 0
_________________________________________________________________
dense_1 (Dense) (None, 256) 33024
_________________________________________________________________
dropout_1 (Dropout) (None, 256) 0
_________________________________________________________________
dense_2 (Dense) (None, 128) 32896
_________________________________________________________________
dropout_2 (Dropout) (None, 128) 0
_________________________________________________________________
dense_3 (Dense) (None, 1) 129
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
2021-01-11 22:23:12.036891: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-11 22:23:12.038427: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2894530000 Hz
Epoch 1/10
2021-01-11 22:23:12.283066: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 0
1/Unknown - 0s 415ms/step - loss: 0.7726 - accuracy: 0.37502021-01-11 22:23:12.518990: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 1024
29/Unknown - 1s 20ms/step - loss: 0.6915 - accuracy: 0.54202021-01-11 22:23:13.043534: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 2048
46/Unknown - 2s 25ms/step - loss: 0.6721 - accuracy: 0.57392021-01-11 22:23:13.575452: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 3072
62/Unknown - 2s 27ms/step - loss: 0.6572 - accuracy: 0.59512021-01-11 22:23:14.102275: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 4096
78/Unknown - 3s 28ms/step - loss: 0.6444 - accuracy: 0.61152021-01-11 22:23:14.638575: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 5120
93/Unknown - 3s 29ms/step - loss: 0.6345 - accuracy: 0.62332021-01-11 22:23:15.167787: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 6144
109/Unknown - 4s 30ms/step - loss: 0.6251 - accuracy: 0.63442021-01-11 22:23:15.696971: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 7168
126/Unknown - 4s 30ms/step - loss: 0.6165 - accuracy: 0.64432021-01-11 22:23:16.213187: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 8192
128/Unknown - 5s 33ms/step - loss: 0.6155 - accuracy: 0.64542021-01-11 22:23:16.733228: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 9216
144/Unknown - 5s 33ms/step - loss: 0.6086 - accuracy: 0.65312021-01-11 22:23:17.252568: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 10240
160/Unknown - 6s 33ms/step - loss: 0.6024 - accuracy: 0.65982021-01-11 22:23:17.771701: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 11264
190/Unknown - 6s 31ms/step - loss: 0.5925 - accuracy: 0.67032021-01-11 22:23:18.295851: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 12288
192/Unknown - 7s 33ms/step - loss: 0.5920 - accuracy: 0.67092021-01-11 22:23:18.815791: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 13312
222/Unknown - 7s 31ms/step - loss: 0.5842 - accuracy: 0.67902021-01-11 22:23:19.343171: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 14336
224/Unknown - 8s 33ms/step - loss: 0.5837 - accuracy: 0.67952021-01-11 22:23:19.857738: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 15360
254/Unknown - 8s 31ms/step - loss: 0.5770 - accuracy: 0.68632021-01-11 22:23:20.379426: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 16384
270/Unknown - 9s 31ms/step - loss: 0.5737 - accuracy: 0.68952021-01-11 22:23:20.899491: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 17408
286/Unknown - 9s 31ms/step - loss: 0.5706 - accuracy: 0.69252021-01-11 22:23:21.420403: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 18432
302/Unknown - 10s 31ms/step - loss: 0.5677 - accuracy: 0.69522021-01-11 22:23:21.941930: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 19456
304/Unknown - 10s 33ms/step - loss: 0.5674 - accuracy: 0.69562021-01-11 22:23:22.461011: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 20480
Note: I did time myself when copying the gists into my teminal and it was a bit less than 3mins.
Sign up for our Community Slack (here!) and engage with us on Twitter via @redpandadata or personally at @emaxerrno
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.