Hello world! with Tensorflow and Redpanda.

ByAlexander GallegoonJanuary 11, 2021
Realtime ML in 3 mins with TensorFlow and Redpanda

If you have not heard of TensorFlow, please teach me the ways of the yedi! From the project website:

TensorFlow is an end-to-end open source platform for machine learning. It has a comprehensive, flexible ecosystem of tools, libraries and community resources that lets researchers push the state-of-the-art in ML and developers easily build and deploy ML powered applications.

People on the street import tensorflow as tf, and so will the code below. Frankly, I am not a machine learning engineer, but thought it would be cool to download the new Apache Kafka®-IO wrappers for producing and consuming tensorflow native record format straight from Kafka without intermediaries and check if they work with Redpanda... and of course it worked first try :D

Install redpanda

In this tutorial, I'll only cover the ubuntu install, but head over to our github repo, give us a star and see download instructions for your operating system.

curl -1sLf \ 'https://packages.vectorized.io/nzc4ZYQK3WRGd9sy/redpanda/cfg/setup/bash.deb.sh' \ | sudo -E bash sudo apt-get install redpanda && sudo systemctl start redpanda

Install deps on your machine

pip3 install --user kafka-python tensorflow-io sklearn pandas

Copy this gist

I copied this gist from the main github repo of the new tensorflow/io organization: https://github.com/tensorflow/io

#Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from datetime import datetime import time import threading import json from kafka import KafkaProducer from kafka.errors import KafkaError from sklearn.model_selection import train_test_split import pandas as pd import tensorflow as tf import tensorflow_io as tfio print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__)) COLUMNS = [ # labels 'class', # low-level features 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', # high-level derived features 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)' ] susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000) susy_df = next(susy_iterator) susy_df.head() # Number of datapoints and columns len(susy_df), len(susy_df.columns) # Number of datapoints belonging to each class (0: background noise, 1: signal) len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1]) train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True) print("Number of training samples: ",len(train_df)) print("Number of testing sample: ",len(test_df)) x_train_df = train_df.drop(["class"], axis=1) y_train_df = train_df["class"] x_test_df = test_df.drop(["class"], axis=1) y_test_df = test_df["class"] # The labels are set as the kafka message keys so as to store data # in multiple-partitions. Thus, enabling efficient data retrieval # using the consumer groups. x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:])) y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:])) x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:])) y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:])) NUM_COLUMNS = len(x_train_df.columns) len(x_train), len(y_train), len(x_test), len(y_test) def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka(topic_name, items): count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback) count+=1 producer.flush() print("Wrote {0} messages into topic: {1}".format(count, topic_name)) write_to_kafka("susy-train", zip(x_train, y_train)) write_to_kafka("susy-test", zip(x_test, y_test)) def decode_kafka_item(item): message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(item.key) return (message, key) BATCH_SIZE=64 SHUFFLE_BUFFER_SIZE=64 train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0) train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE) train_ds = train_ds.map(decode_kafka_item) train_ds = train_ds.batch(BATCH_SIZE) OPTIMIZER="adam" LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True) METRICS=['accuracy'] EPOCHS=10 # design/build the model model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(NUM_COLUMNS,)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(1, activation='sigmoid') ]) print(model.summary()) # compile the model model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS) # fit the model model.fit(train_ds, epochs=EPOCHS) test_ds = tfio.experimental.streaming.KafkaGroupIODataset( topics=["susy-test"], group_id="testcg", servers="127.0.0.1:9092", stream_timeout=10000, configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def decode_kafka_test_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) test_ds = test_ds.map(decode_kafka_test_item) test_ds = test_ds.batch(BATCH_SIZE) res = model.evaluate(test_ds) print("test loss, test acc:", res) online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset( topics=["susy-train"], group_id="cgonline", servers="127.0.0.1:9092", stream_timeout=30000, # in milliseconds, to block indefinitely, set it to -1. configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka_after_sleep(topic_name, items): time.sleep(30) print("#"*100) print("Writing messages into topic: {0} after a nice sleep !".format(topic_name)) print("#"*100) count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8') ).add_errback(error_callback) count+=1 producer.flush() print("#"*100) print("Wrote {0} messages into topic: {1}".format(count, topic_name)) print("#"*100) def decode_kafka_online_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) thread = threading.Thread(target=write_to_kafka_after_sleep, args=("susy-train", zip(x_train, y_train))) thread.daemon = True thread.start() for mini_ds in online_train_ds: mini_ds = mini_ds.shuffle(buffer_size=32) mini_ds = mini_ds.map(decode_kafka_online_item) mini_ds = mini_ds.batch(32) model.fit(mini_ds, epochs=3)

Download the data

curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

Profit!

I had to install the cuda libs to get my GPU humming. However, after a quick install of that I was up and running!

❯ python3 main.py 2021-01-11 22:23:02.910794: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0 tensorflow-io version: 0.17.0 tensorflow version: 2.4.0 Number of training samples: 60000 Number of testing sample: 40000 Wrote 60000 messages into topic: susy-train Wrote 40000 messages into topic: susy-test 2021-01-11 22:23:10.847745: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA 2021-01-11 22:23:10.902048: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set 2021-01-11 22:23:10.902534: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1 2021-01-11 22:23:10.928441: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2021-01-11 22:23:10.928471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: darktower 2021-01-11 22:23:10.928483: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: darktower 2021-01-11 22:23:10.928573: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 460.27.4 2021-01-11 22:23:10.928598: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 455.28.0 2021-01-11 22:23:10.928608: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 455.28.0 does not match DSO version 460.27.4 -- cannot find working devices in this configuration 2021-01-11 22:23:10.931185: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set 2021-01-11 22:23:11.935134: I tensorflow_io/core/kernels/kafka_kernels.cc:349] Kafka tail: 59449 Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense (Dense) (None, 128) 2432 _________________________________________________________________ dropout (Dropout) (None, 128) 0 _________________________________________________________________ dense_1 (Dense) (None, 256) 33024 _________________________________________________________________ dropout_1 (Dropout) (None, 256) 0 _________________________________________________________________ dense_2 (Dense) (None, 128) 32896 _________________________________________________________________ dropout_2 (Dropout) (None, 128) 0 _________________________________________________________________ dense_3 (Dense) (None, 1) 129 ================================================================= Total params: 68,481 Trainable params: 68,481 Non-trainable params: 0 _________________________________________________________________ None 2021-01-11 22:23:12.036891: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2) 2021-01-11 22:23:12.038427: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2894530000 Hz Epoch 1/10 2021-01-11 22:23:12.283066: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 0 1/Unknown - 0s 415ms/step - loss: 0.7726 - accuracy: 0.37502021-01-11 22:23:12.518990: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 1024 29/Unknown - 1s 20ms/step - loss: 0.6915 - accuracy: 0.54202021-01-11 22:23:13.043534: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 2048 46/Unknown - 2s 25ms/step - loss: 0.6721 - accuracy: 0.57392021-01-11 22:23:13.575452: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 3072 62/Unknown - 2s 27ms/step - loss: 0.6572 - accuracy: 0.59512021-01-11 22:23:14.102275: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 4096 78/Unknown - 3s 28ms/step - loss: 0.6444 - accuracy: 0.61152021-01-11 22:23:14.638575: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 5120 93/Unknown - 3s 29ms/step - loss: 0.6345 - accuracy: 0.62332021-01-11 22:23:15.167787: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 6144 109/Unknown - 4s 30ms/step - loss: 0.6251 - accuracy: 0.63442021-01-11 22:23:15.696971: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 7168 126/Unknown - 4s 30ms/step - loss: 0.6165 - accuracy: 0.64432021-01-11 22:23:16.213187: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 8192 128/Unknown - 5s 33ms/step - loss: 0.6155 - accuracy: 0.64542021-01-11 22:23:16.733228: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 9216 144/Unknown - 5s 33ms/step - loss: 0.6086 - accuracy: 0.65312021-01-11 22:23:17.252568: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 10240 160/Unknown - 6s 33ms/step - loss: 0.6024 - accuracy: 0.65982021-01-11 22:23:17.771701: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 11264 190/Unknown - 6s 31ms/step - loss: 0.5925 - accuracy: 0.67032021-01-11 22:23:18.295851: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 12288 192/Unknown - 7s 33ms/step - loss: 0.5920 - accuracy: 0.67092021-01-11 22:23:18.815791: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 13312 222/Unknown - 7s 31ms/step - loss: 0.5842 - accuracy: 0.67902021-01-11 22:23:19.343171: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 14336 224/Unknown - 8s 33ms/step - loss: 0.5837 - accuracy: 0.67952021-01-11 22:23:19.857738: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 15360 254/Unknown - 8s 31ms/step - loss: 0.5770 - accuracy: 0.68632021-01-11 22:23:20.379426: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 16384 270/Unknown - 9s 31ms/step - loss: 0.5737 - accuracy: 0.68952021-01-11 22:23:20.899491: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 17408 286/Unknown - 9s 31ms/step - loss: 0.5706 - accuracy: 0.69252021-01-11 22:23:21.420403: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 18432 302/Unknown - 10s 31ms/step - loss: 0.5677 - accuracy: 0.69522021-01-11 22:23:21.941930: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 19456 304/Unknown - 10s 33ms/step - loss: 0.5674 - accuracy: 0.69562021-01-11 22:23:22.461011: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 20480

Note: I did time myself when copying the gists into my teminal and it was a bit less than 3mins.

Sign up for our Community Slack (here!) and engage with us on Twitter via @redpandadata or personally at @emaxerrno

Let's keep in touch

Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.