-
Notifications
You must be signed in to change notification settings - Fork 3
Flask Quickstart
Alon Gubkin edited this page Jun 8, 2022
·
2 revisions
This guide will explain how to stream inferences from any Flask server to a parquet file on S3.
- Kubernetes cluster
- Kafka - with Schema Registry, Kafka Connect, and Confluent S3 Sink connector plugin
To get started as quickly as possible, see the Kafka deployment tutorial, which shows how to set up Kafka in minutes.
First, you'll need to install the kafka-python library:
pip install kafka-python
Then, whenever you have a new prediction you can publish it to a Kafka topic. With Flask, it looks like this:
producer = KafkaProducer(bootstrap_servers="kafka-cp-kafka:9092")
@app.route("/predict", methods=["POST"])
def predict():
...
producer.send("my-model", json.dumps({
"id": str(uuid.uuid4()),
"model_name": "my model",
"model_version": "v1",
"inputs": [{
"age": 38,
"previously_insured": True,
}],
"outputs": [{
"will_buy_insurance": True,
"confidence": 0.98,
}],
}).encode("ascii"))
Finally, we can create an InferenceLogger resource to stream the predictions to S3 using InferenceDB:
apiVersion: inferencedb.aporia.com/v1alpha1
kind: InferenceLogger
metadata:
name: my-model
namespace: default
spec:
topic: my-model
events:
type: json
config: {}
destination:
type: confluent-s3
config:
url: s3://aporia-data/inferencedb
format: parquet
# Optional - Only if you want to override column names
# schema:
# type: avro
# config:
# columnNames:
# inputs: [sepal_width, petal_width, sepal_length, petal_length]
# outputs: [flower]
If everything was configured correctly, these predictions should have been logged to a Parquet file in S3.
import pandas as pd
df = pd.read_parquet("s3://aporia-data/inferencedb/default-my-model/")
print(df)