TinyML with Arduino

Ant

In this article I will show how to build Tensorflow Lite based jelly bears classifier using Arduino Nano 33 BLE Sense.

Before you will continue reading please watch short introduction:

Currently a machine learning solution can be deployed not only on very powerful machines containing GPU cards but also on a really small devices. Of course such a devices has a some limitation eg. memory etc. To deploy ML model we need to prepare it. The Tensorflow framework allows you to convert neural networks to Tensorflow Lite which can be installed on the edge devices eg. Arduino Nano.

Arduino Nano 33 BLE Sense is equipped with many sensors that allow for the implementation of many projects eg.:
* Digital microphone
* Digital proximity, ambient light, RGB and gesture sensor
* 3D magnetometer, 3D accelerometer, 3D gyroscope
* Capacitive digital sensor for relative humidity and temperature

Examples which I have used in this project can be found here.

Arduino sensors

To simplify device usage I have build Arduino Lab project where you can test and investigate listed sensors directly on the web browser.

The project dependencies are packed into docker image to simplify usage.

Before you start the project you will need to connect Arduino through USB (the Arduino will communicate with docker container through /dev/ttyACM0)

git clone https://github.com/qooba/tinyml-arduino.git
cd tinyml-arduino
./run.server.sh
# in another terminal tab
./run.nginx.sh
# go inside server container 
docker exec -it arduino /bin/bash
./start.sh

For each sensor type you can click Prepare button which will build and deploy appropriate Arduino code.


NOTE:
Sometimes you will have to deploy to arduino manually to do this you will need to
go to arduino container

docker exec -it arduino /bin/bash
cd /arduino
make rgb

Here you have complete Makefile with all types of implemented sensors.


You can start observations using Watch button.
Arduino pdm
Arduino temperature
Arduino rgb

Now we will build TinyML solution.
In the first step we will capture training data:
Arduino capture

The training data will be saved in the csv format. You will need to repeat the proces for each class you will detect.

Captured data will be uploaded to the Colab Notebook.
Here I fully base on the project Fruit identification using Arduino and TensorFlow.
In the notebook we train the model using Tensorflow then convert it to Tensorflow Lite and finally encode to hex format (model.h header file) which is readable by Arduino.

Now we compile and upload model.h header file using drag and drop mechanism.

Arduino upload

Finally we can classify the jelly bears by the color:

Arduino classify

Feast with AI – feed your MLflow models with feature store

feast

In this article I will show how to prepare complete MLOPS solution based on the Feast feature store and MLflow platform.

Before you will continue reading please watch short introduction:

The whole solution will be deployed on the kubernetes (mlflow_feast.yaml).

component

We will use:
* Feast – as a Feature Store
* MLflow – as model repository
* Minio – as a S3 storage
* Jupyter notebook – as a workspace
* Redis – for a online features store

propensity to buy

To better visualize the whole process we will use the Propensity to buy example where I base on the Kaggle examples and data.

mlops

We start in Jupyter Notebook where we prepare Feast feature store schema which is kept in S3.

We can simply inspect the Feast schema in Jupyter Notebook:

from feast import FeatureStore
from IPython.core.display import display, HTML
import json
from json2html import *
import warnings
warnings.filterwarnings('ignore')

class FeastSchema:
    def __init__(self, repo_path: str):
        self.store = FeatureStore(repo_path=repo_path)

    def show_schema(self, skip_meta: bool= False):
        feast_schema=self.__project_show_schema(skip_meta)        
        display(HTML(json2html.convert(json = feast_schema)))

    def show_table_schema(self, table: str, skip_meta: bool= False):
        feasture_tables_dictionary=self.__project_show_schema(skip_meta)
        display(HTML(json2html.convert(json = {table:feasture_tables_dictionary[table]})))

    def __project_show_schema(self, skip_meta: bool= False):
        entities_dictionary={}
        feast_entities=self.store.list_entities()
        for entity in feast_entities:
            entity_dictionary=entity.to_dict()
            entity_spec=entity_dictionary['spec']
            entities_dictionary[entity_spec['name']]=entity_spec

        feasture_tables_dictionary={}
        feast_feature_tables=self.store.list_feature_views()
        for feature_table in feast_feature_tables:
            feature_table_dict=json.loads(str(feature_table))
            feature_table_spec=feature_table_dict['spec']
            feature_table_name=feature_table_spec['name']
            feature_table_spec.pop('name',None)
            if 'entities' in feature_table_spec:
                feature_table_entities=[]
                for entity in feature_table_spec['entities']:
                    feature_table_entities.append(entities_dictionary[entity])
                feature_table_spec['entities']=feature_table_entities

            if not skip_meta:
                feature_table_spec['meta']=feature_table_dict['meta']
            else:
                feature_table_spec.pop('input',None)
                feature_table_spec.pop('ttl',None)
                feature_table_spec.pop('online',None)

            feasture_tables_dictionary[feature_table_name]=feature_table_spec

        return feasture_tables_dictionary




FeastSchema(".").show_schema()
#FeastSchema(".").show_schema(skip_meta=True)
#FeastSchema(".").show_table_schema('driver_hourly_stats')
#FeastSchema().show_tables()

In our case we store the data in Apache Parquet files in S3 bucket.
Using the Feast we can fetch the historical features and train the model using Scikit-learn library

bucket_name="propensity"
filename="training_sample"

store = FeatureStore(repo_path=".")

s3 = fs.S3FileSystem(endpoint_override=os.environ.get("FEAST_S3_ENDPOINT_URL"))
entity_df=pd.read_parquet(f'{bucket_name}/{filename}_entities.parquet', filesystem=s3)
entity_df["event_timestamp"]=datetime.now()


training_df = store.get_historical_features(
    entity_df=entity_df, 
    feature_refs = [
        'propensity_data:basket_icon_click',
        'propensity_data:basket_add_list',
        'propensity_data:basket_add_detail',
        'propensity_data:sort_by',
        'propensity_data:image_picker',
        'propensity_data:account_page_click',
        'propensity_data:promo_banner_click',
        'propensity_data:detail_wishlist_add',
        'propensity_data:list_size_dropdown',
        'propensity_data:closed_minibasket_click',
        'propensity_data:checked_delivery_detail',
        'propensity_data:checked_returns_detail',
        'propensity_data:sign_in',
        'propensity_data:saw_checkout',
        'propensity_data:saw_sizecharts',
        'propensity_data:saw_delivery',
        'propensity_data:saw_account_upgrade',
        'propensity_data:saw_homepage',
        'propensity_data:device_mobile',
        'propensity_data:device_computer',
        'propensity_data:device_tablet',
        'propensity_data:returning_user',
        'propensity_data:loc_uk',
        'propensity_data:ordered'
    ],
).to_df()

predictors = training_df.drop(['propensity_data__ordered','UserID','event_timestamp'], axis=1)
targets = training_df['propensity_data__ordered']

X_train, X_test, y_train, y_test = train_test_split(predictors, targets, test_size=.3)

classifier=GaussianNB(var_smoothing=input_params['var_smoothing'])
classifier=classifier.fit(X_train,y_train)

predictions=classifier.predict(X_test)

conf_matrix=sklearn.metrics.confusion_matrix(y_test,predictions)
ac_score=sklearn.metrics.accuracy_score(y_test, predictions)

propensity_model_path = 'propensity.joblib'
joblib.dump(classifier, propensity_model_path)

artifacts = {
    "propensity_model": propensity_model_path,
    "feature_store": "feature_store.yaml"
}

The model will use online Feast redis features as well as additional features from the request thus we need to wrap the MLflow model and define it:

import mlflow.pyfunc
class PropensityWrapper(mlflow.pyfunc.PythonModel):

    def load_context(self, context):
        import joblib
        from feast import FeatureStore
        import pandas as pd 
        import os

        self.model = joblib.load(context.artifacts["propensity_model"])
        self.store = FeatureStore(repo_path=os.environ.get("FEAST_REPO_PATH"))

    def predict(self, context, model_input):
        users=list(model_input.to_dict()["UserID"].values())

        feature_vector = self.store.get_online_features(
            feature_refs=[
                'propensity_data:basket_icon_click',
                'propensity_data:basket_add_list',
                'propensity_data:basket_add_detail',
                'propensity_data:sort_by',
                'propensity_data:image_picker',
                'propensity_data:account_page_click',
                'propensity_data:promo_banner_click',
                'propensity_data:detail_wishlist_add',
                'propensity_data:list_size_dropdown',
                'propensity_data:closed_minibasket_click',
                'propensity_data:checked_delivery_detail',
                'propensity_data:checked_returns_detail',
                'propensity_data:sign_in',
                'propensity_data:saw_checkout',
                'propensity_data:saw_sizecharts',
                'propensity_data:saw_delivery',
                'propensity_data:saw_account_upgrade',
                'propensity_data:saw_homepage',
                'propensity_data:returning_user',
                'propensity_data:loc_uk'
            ],
            entity_rows=[{"UserID": uid} for uid in users]
        ).to_dict()

        data=pd.DataFrame.from_dict(feature_vector)
        merged_data = pd.merge(model_input,data, how="inner", on=["UserID"], suffixes=('_x', '')).drop(['UserID'], axis=1)
        return self.model.predict(merged_data)

Now we can log the MLflow model to the repository:

import warnings
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
import mlflow.sklearn
import mlflow.pyfunc

#conda_env=mlflow.pyfunc.get_default_conda_env()

with mlflow.start_run():

    #mlflow.log_param("var_smoothing", input_params['var_smoothing'])
    mlflow.log_metric("accuracy_score", ac_score)

    tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

    if tracking_url_type_store != "file":
        mlflow.pyfunc.log_model("model",
                                 registered_model_name="propensity_model",
                                 python_model=PropensityWrapper(),
                                 artifacts=artifacts,
                                 conda_env=conda_env)
    else:
        mlflow.pyfunc.log_model("model",
                                 path=my_model_path,
                                 python_model=PropensityWrapper(),
                                 artifacts=artifacts,
                                 conda_env=conda_env)

We can export the code and run is using MLflow cli:

mlflow run . --no-conda --experiment-name="propensity" -P var_smoothing=1e-9

Now we need to materialize features to Redis:

feast materialize 2021-03-22T23:42:00 2021-06-22T23:42:00

Using MLflow we can simply deploy model as a microservice in k8s.
In our case we want to deploy the model models:/propensity_model/Production
which is currently assigned for Production. During start the MLflow will automatically fetch the proper model from S3:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-serving
  namespace: qooba
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow-serving
      version: v1
  template:
    metadata:
      labels:
        app: mlflow-serving
        version: v1
    spec:
      containers:
      - image: qooba/mlflow:serving
        imagePullPolicy: IfNotPresent
        name: mlflow-serving
        env:
        - name: MLFLOW_TRACKING_URI
          value: http://mlflow.qooba.svc.cluster.local:5000
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: minio-auth
              key: username
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: minio-auth
              key: password
        - name: MLFLOW_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        - name: FEAST_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        - name: REDIS_TYPE
          value: REDIS
        - name: REDIS_CONNECTION_STRING
          value: redis.qooba.svc.cluster.local:6379,db=0
        - name: FEAST_TELEMETRY
          value: "false"
        - name: FEAST_REPO_PATH
          value: /feast_repository
        - name: PORT
          value: "5000"
        - name: MODEL
          value: models:/propensity_model/Production
        ports:
        - containerPort: 5000
        volumeMounts:
          - mountPath: /feast_repository
            name: config
      volumes:
        - name: config
          configMap:
            name: mlflow-serving
            items:
            - key: feature_store
              path: feature_store.yaml

On each HTTP request:

import requests
import json

url="http://mlflow-serving.qooba.svc.cluster.local:5000/invocations"

headers={
    'Content-Type': 'application/json; format=pandas-records'
}

data=[
    {"UserID": "a720-6b732349-a720-4862-bd21-644732",
     'propensity_data:device_mobile': 1.0,
     'propensity_data:device_computer': 0.0,
     'propensity_data:device_tablet': 0.0
    }
]

response=requests.post(url, data=json.dumps(data), headers=headers)
response.text

The model will fetch the client features (based on UserID) from Redis and HTTP request and generate prediction.

Flink with AI – how to use Flink with MLflow model in Jupyter Notebook

squirrel

In this article I will show how to process streams with Apache Flink and MLflow model

Before you will continue reading please watch short introduction:

Apache Flink allows for an efficient and scalable way of processing streams. It is a distributed processing engine which supports multiple sources like: Kafka, NiFi and many others
(if we need custom, we can create them ourselves).

Apache Flink also provides the framework for defining streams operations in languages like:
Java, Scala, Python and SQL.

To simplify the such definitions we can use Jupyter Notebook as a interface. Of course we can write in Python using PyFlink library but we can make it even easier using writing jupyter notebook extension (“magic words”).

Using Flink extension (magic.ipynb) we can simply use Flink SQL sql syntax directly in Jupyter Notebook.

To use the extesnions we need to load it:

%reload_ext flinkmagic

Then we need to initialize the Flink StreamEnvironment:

%flink_init_stream_env

Now we can use the SQL code for example:

FileSystem connector:

%%flink_execute_sql
CREATE TABLE MySinkTable (
    word varchar,
    cnt bigint) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/opt/flink/notebooks/data/word_count_output1')

MySQL connector:

%%flink_execute_sql
CREATE TABLE MySinkDbSmsTable (
    smstext varchar,
    smstype varchar) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://mysql:3306/test',
        'connector.table' = 'sms',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.write.flush.interval' = '10',
        'connector.username' = 'root',
        'connector.password' = 'my-secret-pw')

Kafka connector:

%%flink_execute_sql
CREATE TABLE MySourceKafkaTable (word varchar) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.bootstrap.servers' = 'kafka:9092',
    'connector.properties.group.id' = 'test',
    'format.type' = 'csv'
        )

The magic keyword will automatically execute SQL in existing StreamingEnvironment.

Now we can apply the Machine Learning model. In plain Flink we can use UDF function defined
in python but we will use MLflow model which wraps the ML frameworks (like PyTorch, Tensorflow, Scikit-learn etc.). Because MLflow expose homogeneous interface we can
create another “jupyter magic” which will automatically load MLflow model as a Flink function.

%flink_mlflow "SPAM_CLASSIFIER" "/mlflow/mlruns/2/64a89b0a6b7346498316bfae4c298535/artifacts/model" "[DataTypes.STRING()]" "DataTypes.STRING()"

Now we can simply write Flink SQL query:

%%flink_sql_query
SELECT word as smstext, SPAM_CLASSIFIER(word) as smstype FROM MySourceKafkaTable

which in our case will fetch kafka events and classify it using MLflow spam classifier. The
results will be displayed in the realtime in the Jupyter Notebook as a events DataFrame.

If we want we can simply use other python libraries (like matplotlib and others) to create
graphical representation of the results eg. pie chart.

You can find the whole code including: Flink examples, extension and Dockerfiles here:
https://github.com/qooba/flink-with-ai.

You can also use docker image: qooba/flink:dev to test and run notebooks inside.
Please check the run.sh
where you have all components (Kafka, MySQL, Jupyter with Flink, MLflow repository).

Animated Art with AI – face reeanactment in action

faces

In this article I will show how to use artificial intelligence to add motion to the images and photos.

Before you will continue reading please watch short introduction:

Face reenactment

To bring photos to life we can use the face reenactment algorithm designed to transfer the facial movements in the video to another image.

face reenactment diagram

In this project I have used github implementation: https://github.com/AliaksandrSiarohin/first-order-model. Where the extensive description of the neural network architecture can be found in this paper. The solution contains of two parts: motion module and generation module.
The motion module at the first stage extracts the key points from the source and target image. In fact in the solution we assume that reference image which we can to the source and target image exists and at the first stage the transformations from reference image to source (T_{S \leftarrow R} (p_k)) and target (T_{T \leftarrow R} (p_k)) image is calculated respectively. Then the first order Taylor expansions \frac{d}{dp}T_{S \leftarrow R} (p)| {p=p_k} and \frac{d}{dp}T_{T \leftarrow R} (p)| {p=p_k} is used to calculate dense motion field.
The generation module use calculated dense motion field and source image to generate new image that will resemble target image.

face reenactment diagram

The whole solution is packed into docker image thus we can simply reproduce the results using command:

docker run -it --rm --gpus all -v $(pwd)/torch_models:/root/.torch/models -v $(pwd)/checkpoints:/ai/checkpoints -v $(pwd)/test:/ai/test qooba/aifacereeanactment python3 ./prepare.py --source_image /ai/test/test.jpg --driving_video /ai/test/test.mp4 --output /ai/test/test_generated.mp4

NOTE: additional volumes (torch_models and checkpoints) are mount because during first run the trained neural networks are downloaded.

To reproduce the results we need to provide two files motion video and source image. In above example I put them into test directory and mount it into docker container (-v $(pwd)/test:/ai/test) to use them into it.

Below you have all command line options:

usage: prepare.py [-h] [--config CONFIG] [--checkpoint CHECKPOINT]
                  [--source_image SOURCE_IMAGE]
                  [--driving_video DRIVING_VIDEO] [--crop_image]
                  [--crop_image_padding CROP_IMAGE_PADDING [CROP_IMAGE_PADDING ...]]
                  [--crop_video] [--output OUTPUT] [--relative]
                  [--no-relative] [--adapt_scale] [--no-adapt_scale]
                  [--find_best_frame] [--best_frame BEST_FRAME] [--cpu]

first-order-model

optional arguments:
  -h, --help            show this help message and exit
  --config CONFIG       path to config
  --checkpoint CHECKPOINT
                        path to checkpoint to restore
  --source_image SOURCE_IMAGE
                        source image
  --driving_video DRIVING_VIDEO
                        driving video
  --crop_image, -ci     autocrop image
  --crop_image_padding CROP_IMAGE_PADDING [CROP_IMAGE_PADDING ...], -cip CROP_IMAGE_PADDING [CROP_IMAGE_PADDING ...]
                        autocrop image paddings left, upper, right, lower
  --crop_video, -cv     autocrop video
  --output OUTPUT       output video
  --relative            use relative or absolute keypoint coordinates
  --no-relative         don't use relative or absolute keypoint coordinates
  --adapt_scale         adapt movement scale based on convex hull of keypoints
  --no-adapt_scale      no adapt movement scale based on convex hull of
                        keypoints
  --find_best_frame     Generate from the frame that is the most alligned with
                        source. (Only for faces, requires face_aligment lib)
  --best_frame BEST_FRAME
                        Set frame to start from.
  --cpu                 cpu mode.

New Face with AI

faces

In this article I will show how to use artificial intelligence to generate human faces.

Before you will continue reading please watch short introduction:

Generative adversarial network

To generate realistic human faces, we can use neural networks with GAN (Generative adversarial network) architecture.

neural network architecture

The GaN network consists of two parts of the Generator whose task is to generate the image from random input and a discriminator that checks if the generated image is realistic.

training progress

During training, the networks compete with each other, the generator tries to generate better and better images
and thereby deceive the Discriminator. On the other hand, the Discriminator learns to distinguish between real and generated photos.

To train the discriminator, we use both real photos and those generated by the generator.

Finally, we can achieve the following results using DCGAN network.
As you can see some faces look realistic while some are distorted, additionally the network can only generate low resolution images.

training results

We can achieve much better results using the StyleGaN (arxiv article) network, which, among other things, differs in that the next layers of the network are progressively added during training.

I generated the images using pretrained networks and the effect is really amazing.

results stylegan

Unblur low quality face images with AI

clay

In this article I will show how to improve the quality of blurred face images using
artificial intelligence. For this purpose I will use neural networks and FastAI library (ver. 1)

The project code is available on my github: https://github.com/qooba/aiunblur
You can also use ready docker image: https://hub.docker.com/repository/docker/qooba/aiunblur

Before you will continue reading please watch short introduction:

I have based o lot on the fastai course thus I definitely recommend to go through it.

Data

To train neural network how to rebuild the face images we need to provide the
faces dataset which will show how low quality and blurred images should be reconstructed.
Thus we need pairs of low and high quality images.

To prepare the data set we can use available fases dataset eg. FFHQ, Tufts Face Database, CelebA

We will treat the original images as a high resolution data and rescale them
to prepare low resolution input:

import fastai
from fastai.vision import *
from fastai.callbacks import *
from fastai.utils.mem import *
from torchvision.models import vgg16_bn
from pathlib import Path

path = Path('/opt/notebooks/faces')
path_hr = path/'high_resolution'
path_lr = path/'small-96'

il = ImageList.from_folder(path_hr)

def resize_one(fn, i, path, size):
    dest = path/fn.relative_to(path_hr)
    dest.parent.mkdir(parents=True, exist_ok=True)
    img = PIL.Image.open(fn)
    targ_sz = resize_to(img, size, use_min=True)
    img = img.resize(targ_sz, resample=PIL.Image.BILINEAR).convert('RGB')
    img.save(dest, quality=60)

sets = [(path_lr, 96)]
for p,size in sets:
    if not p.exists(): 
        print(f"resizing to {size} into {p}")
        parallel(partial(resize_one, path=p, size=size), il.items)

Now we can create data bunch for training:

bs,size=32,128
arch = models.resnet34
src = ImageImageList.from_folder(path_lr).split_by_rand_pct(0.1, seed=42)

def get_data(bs,size):
    data = (src.label_from_func(lambda x: path_hr/x.name)
           .transform(get_transforms(max_zoom=2.), size=size, tfm_y=True)
           .databunch(bs=bs,num_workers=0).normalize(imagenet_stats, do_y=True))

    data.c = 3
    return data

data = get_data(bs,size)

Training

In this solution we will use a neural network with UNET architecture.

neural network architecture

The UNET neural network contains two parts Encoder and Decoder which are used to reconstruct the face image.
During the first stage Encoder fetch the input, extracts and aggregates the image features. At each stage the features maps are donwsampled.
Then Decoder uses extracted features and tries to rebuild the image upsampling it at each decoding stage. Finally we get regenerated images.

Additionally we need to define the Loss Function which will tell the model if the image was rebuilt correctly and allow to train the model.

To do this we will use additional neural network VGG-16. We will put Generated image and Original image (which is our target) to the network input. Then will compare the features extracted for both images at selected layers and according to this calculated the loss.

Finally we will use Adam optmizer to minimize the loss and achieve better result.

def gram_matrix(x):
    n,c,h,w = x.size()
    x = x.view(n, c, -1)
    return (x @ x.transpose(1,2))/(c*h*w)

base_loss = F.l1_loss

vgg_m = vgg16_bn(True).features.cuda().eval()
requires_grad(vgg_m, False)

blocks = [i-1 for i,o in enumerate(children(vgg_m)) if isinstance(o,nn.MaxPool2d)]

class FeatureLoss(nn.Module):
    def __init__(self, m_feat, layer_ids, layer_wgts):
        super().__init__()
        self.m_feat = m_feat
        self.loss_features = [self.m_feat[i] for i in layer_ids]
        self.hooks = hook_outputs(self.loss_features, detach=False)
        self.wgts = layer_wgts
        self.metric_names = ['pixel',] + [f'feat_{i}' for i in range(len(layer_ids))
              ] + [f'gram_{i}' for i in range(len(layer_ids))]

    def make_features(self, x, clone=False):
        self.m_feat(x)
        return [(o.clone() if clone else o) for o in self.hooks.stored]

    def forward(self, input, target):
        out_feat = self.make_features(target, clone=True)
        in_feat = self.make_features(input)
        self.feat_losses = [base_loss(input,target)]
        self.feat_losses += [base_loss(f_in, f_out)*w
                             for f_in, f_out, w in zip(in_feat, out_feat, self.wgts)]
        self.feat_losses += [base_loss(gram_matrix(f_in), gram_matrix(f_out))*w**2 * 5e3
                             for f_in, f_out, w in zip(in_feat, out_feat, self.wgts)]
        self.metrics = dict(zip(self.metric_names, self.feat_losses))
        return sum(self.feat_losses)

    def __del__(self): self.hooks.remove()

feat_loss = FeatureLoss(vgg_m, blocks[2:5], [5,15,2])

learn = unet_learner(data, arch, wd=wd, loss_func=feat_loss, callback_fns=LossMetrics,
                     blur=True, norm_type=NormType.Weight)

Results

After training we can use the model to regenerate the images:

results

Application

Finally we can export the model and create the drag and drop application which fix the face images in web application.

results

The whole solution is packed into docker images thus you can simply start it using commands:

# with GPU
docker run -d --gpus all --rm -p 8000:8000 --name aiunblur qooba/aiunblur

# without GPU
docker run -d --rm -p 8000:8000 --name aiunblur qooba/aiunblur

To use GPU additional nvidia drivers (included in the NVIDIA CUDA Toolkit) are needed.

Fly AI with Tello drone

balloons

The popularity of drones and the area of their application is becoming greater each year.

In this article I will show how to programmatically control Tello Ryze drone, capture camera video and detect objects using Tensorflow. I have packed the whole solution into docker images (the backend and Web App UI are in separate images) thus you can simply run it.

The project code is available on my github https://github.com/qooba/aidrone
You can also use ready docker image: https://hub.docker.com/repository/docker/qooba/aidrone

Before you will continue reading please watch short introduction:

Architecture

architecture diagram

The application will use two network interfaces.
The first will be used by the python backend to connect the the Tello wifi to send the commands and capture video stream. In the backend layer I have used the DJITelloPy library which covers all required tello move commands and video stream capture.
To efficiently show the video stream in the browser I have used the WebRTC protocol and aiortc library. Finally I have used the Tensorflow 2.0 object detection with pretrained SSD ResNet50 model.

The second network interface will be used to expose the Web Vue application.
I have used nginx to serve the frontend application

Application

drone controls

Using Web interface you can control the Tello movement where you can:
* start video stream
* stop video stream
* takeoff – which starts Tello flight
* land
* up
* down
* rotate left
* rotate right
* forward
* backward
* left
* right

In addition using draw detection switch you can turn on/off the detection boxes on the captured video stream (however this introduces a delay in the video thus it is turned off by default). Additionally I send the list of detected classes through web sockets which are also displayed.

drone detection

As mentioned before I have used the pretrained model thus It is good idea to train your own model to get better results for narrower and more specific class of objects.

Finally the whole solution is packed into docker images thus you can simply start it using commands:

docker network create -d bridge app_default
docker run --name tello --network app_default --gpus all -d --rm -p 8890:8890 -p 8080:8080 -p 8888:8888 -p 11111:11111/udp  qooba/aidrone /bin/bash -c "python3 drone.py"
docker run -d --rm --network app_default --name nginx -p 80:80 -p 443:443 qooba/aidrone:front

To use GPU additional nvidia drivers (included in the NVIDIA CUDA Toolkit) are needed.

Ops … I did it again – MLOps with Kubeflow, MLflow

gears

Machine Learning is one of the hottest area nowadays. New algorithms and models are widely used in commercial solutions thus the whole ML process as a software development and deployment process needs to be optimized.

Kubeflow is an opensource platform which allows to build complete multi-user analytical environment. It is setup on the Kubernetes thus it can be simply installed on a public cloud, on premise Kubernetes cluster or on your workstation.

On the other hand MLFlow is a platform which can be run as standalone application. It doesn’t require Kubernetes thus the setup much more simpler then Kubeflow but it doesn’t support multi-user/multi-team separation.

In this article we will use Kubeflow and MLflow to build the isolated workspace and MLOps pipelines for analytical teams.

Currently we use Kubeflow platform in @BankMillennium to build AI solutions and conduct MLOPS process and this article is inspired by the experience gained while launching and using the platform.

Before you will continue reading please watch short introduction:

AI Platform

The core of the platform will be setup using Kubeflow (version 1.0.1) on Kubernetes (v1.17.0). The Kuberenetes was setup using Rancher RKE which simplifies the installation.

kubeflow main

The Kubeflow gives complete analytical multi-user/multi-teams environment with: authentication (dex), jupyter notebook workspace, pipelines, metadata store, artifact store, models deployment engines (kfserving, seldon).

kubeflow notebooks

Namespace isolation

The user namespaces by default are isolated in Kubeflow UI but in fact are not isolated at all.

The ServiceRoleBinding configuration is very naive and checks only kubeflow-userid header to check RBAC access.

apiVersion: rbac.istio.io/v1alpha1
kind: ServiceRoleBinding
metadata:
  annotations:
    role: admin
    user: admin@kubeflow.org
  namespace: qooba
  ownerReferences:
  - apiVersion: kubeflow.org/v1
    blockOwnerDeletion: true
    controller: true
    kind: Profile
    name: qooba
    uid: 400b5e7b-4b58-40e7-8613-7b0ef01a55ba
spec:
  roleRef:
    kind: ServiceRole
    name: ns-access-istio
  subjects:
  - properties:
      request.headers[kubeflow-userid]: admin@kubeflow.org

Thus we can simply access other namespace notebook from notebooks in different namespace setting kubeflow-userid header:

import requests
url='http://{{notebook_name}}.{{namespace}}.svc.cluster.local'

headers={    
        'kubeflow-userid': "admin@kubeflow.org"
}

requests.get(url,headers=headers).text

To fix this we can setup appropriate Kubernetes NetworkPolicies eg.

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-ingress-default
  namespace: {{namespace}}
spec:
  podSelector: {}
  ingress:
  - from:
    - namespaceSelector:
        matchExpressions:
          - {key: namespace, operator: In, values: [{{namespace}}, kubeflow, istio-system, kube-system]}
  policyTypes:
  - Ingress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: deny-egress-all
  namespace: {{namespace}}
spec:
  podSelector:
    matchLabels: {}
  policyTypes:
  - Egress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-egress-dns
  namespace: {{namespace}}
spec:
  podSelector:
    matchLabels: {}
  policyTypes:
  - Egress
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          namespace: kube-system
    ports:
    - protocol: UDP
      port: 53
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-egress-istio
  namespace: {{namespace}}
spec:
  podSelector:
    matchLabels: {}
  policyTypes:
  - Egress
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          namespace: istio-system
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-egress-kubeflow
  namespace: {{namespace}}
spec:
  podSelector:
    matchLabels: {}
  policyTypes:
  - Egress
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          namespace: kubeflow
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-egress-internal
  namespace: {{namespace}}
spec:
  podSelector:
    matchLabels: {}
  policyTypes:
  - Egress
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          namespace: {{namespace}}

Isolated model registry

By default Kubeflow is equipped with metadata and artifact store shared between namespaces which makes it difficult to secure and organize spaces for teams. To fix this we will setup separate MLflow Tracking Server and Model Registry for each team namespace.

MLflow docker image qooba/mlflow:

FROM continuumio/miniconda3
RUN apt update && apt install python3-mysqldb default-libmysqlclient-dev  -yq
RUN pip install mlflow sklearn jupyterlab watchdog[watchmedo] boto3
RUN conda install pymysql
ENV NB_PREFIX /
CMD ["sh","-c", "jupyter notebook --notebook-dir=/home/jovyan --ip=0.0.0.0 --no-browser --allow-root --port=8888 --NotebookApp.token='' --NotebookApp.password='' --NotebookApp.allow_origin='*' --NotebookApp.base_url=${NB_PREFIX}"]

mlflow.yaml:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: mlflow-pv-claim
  namespace: qooba
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: managed-nfs-storage
  volumeMode: Filesystem
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: mlflow
  namespace: qooba
---
apiVersion: v1
kind: Service
metadata:
  name: mlflow
  namespace: qooba
  labels:
    app: mlflow
spec:
  ports:
  - name: http
    port: 5000
    targetPort: 5000
  selector:
    app: mlflow
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow
  namespace: qooba
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow
      version: v1
  template:
    metadata:
      labels:
        app: mlflow
        version: v1
    spec:
      serviceAccountName: mlflow
      containers:
      - image: qooba/mlflow
        imagePullPolicy: IfNotPresent
        name: mlflow
        command: ["mlflow","server","-h","0.0.0.0","--backend-store-uri","sqlite:///mlflow/mlflow.db","--default-artifact-root","s3://mlflow/mlruns"]]
        #command: ["mlflow","server","-h","0.0.0.0","--backend-store-uri","mysql+pymysql:///mlflow/mlflow.db","--default-artifact-root","s3://mlflow/mlruns"]]
        #command: ["mlflow","server","-h","0.0.0.0","--backend-store-uri","sqlite:///mlflow/mlflow.db","--default-artifact-root","/mlflow/mlruns"]]
        env:
        - name: AWS_ACCESS_KEY_ID
          value: minio
        - name: AWS_SECRET_ACCESS_KEY
          value: minio123
        - name: MLFLOW_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        ports:
        - containerPort: 5000
        volumeMounts:
          - mountPath: /mlflow
            name: mlflow
          - mountPath: /dev/shm
            name: dshm
      volumes:
        - name: mlflow
          persistentVolumeClaim:
            claimName: mlflow-pv-claim
        - emptyDir:
            medium: Memory
          name: dshm
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: mlflow
  namespace: qooba
spec:
  hosts:
  - "*"
  gateways:
  - qooba/mlflow-gateway
  http:
  - match:
    - uri:
        prefix: /
    rewrite:
        uri: /
    route:
    - destination:
        port:
          number: 5000
        host: mlflow
---
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: mlflow-gateway
  namespace: qooba
spec:
  selector:
    istio: ingressgateway
  servers:
  - hosts:
    - '*'
    port:
      name: http
      number: 5000
      protocol: HTTP
---
apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
  name: mlflow-filter
  namespace: istio-system
spec:
  filters:
  - filterConfig:
      httpService:
        authorizationRequest:
          allowedHeaders:
            patterns:
            - exact: cookie
            - exact: X-Auth-Token
        authorizationResponse:
          allowedUpstreamHeaders:
            patterns:
            - exact: kubeflow-userid
        serverUri:
          cluster: outbound|8080||authservice.istio-system.svc.cluster.local
          failureModeAllow: false
          timeout: 10s
          uri: http://authservice.istio-system.svc.cluster.local
      statusOnError:
        code: GatewayTimeout
    filterName: envoy.ext_authz
    filterType: HTTP
    insertPosition:
      index: FIRST
    listenerMatch:
      listenerProtocol: HTTP
      listenerType: GATEWAY
      portNumber: 5000
  workloadLabels:
    istio: ingressgateway
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: dex-mlflow
  namespace: auth
spec:
  gateways:
  - qooba/mlflow-gateway
  hosts:
  - '*'
  http:
  - match:
    - uri:
        prefix: /dex/
    route:
    - destination:
        host: dex.auth.svc.cluster.local
        port:
          number: 5556

additionally we have to edit istio gateway and add mlflow to access the mlflow UI:

kubectl edit svc istio-ingressgateway -n istio-system

and add:

spec:
  ports:
  ...
  - name: mlflow
    nodePort: 31382
    port: 5000
    protocol: TCP
    targetPort: 5000

The MLflow repository can be accessed from web browser:
mlflow repository

Additionally we have to mount PersistentVolume mlflow-pv-claim to user notebook where we will store the training artifacts:

kubectl edit Notebook -n qooba sklearn
apiVersion: kubeflow.org/v1
kind: Notebook
metadata:
  labels:
    app: sklearn
  name: sklearn
  namespace: qooba
spec:
  template:
    spec:
      containers:
      - env: []
        image: qooba/mlflow
        name: sklearn
        resources:
          requests:
            cpu: "0.5"
            memory: 1.0Gi
        volumeMounts:
        - mountPath: /home/jovyan
          name: workspace-sklearn
        - mountPath: /mlflow
          name: mlflow
        - mountPath: /dev/shm
          name: dshm
      serviceAccountName: default-editor
      ttlSecondsAfterFinished: 300
      volumes:
      - name: workspace-sklearn
        persistentVolumeClaim:
          claimName: workspace-sklearn
      - name: mlflow
        persistentVolumeClaim:
          claimName: mlflow-pv-claim
      - emptyDir:
          medium: Memory
        name: dshm  

Now analysts can log models and metrics from jupyter notebook workspace
(code example from https://www.mlflow.org/docs/latest/tutorials-and-examples/tutorial.html):

import os
import warnings
import sys
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
import mlflow.sklearn

import logging

remote_server_uri='http://mlflow:5000'
mlflow.set_tracking_uri(remote_server_uri)

mlflow.set_experiment("/my-experiment2")


logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

warnings.filterwarnings("ignore")
np.random.seed(40)

# Read the wine-quality csv file from the URL
csv_url = (
    "./winequality-red.csv"
)
try:
    data = pd.read_csv(csv_url, sep=";")
except Exception as e:
    logger.exception(
        "Unable to download training & test CSV, check your internet connection. Error: %s", e
    )

train, test = train_test_split(data)

train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]

alpha = 0.5
l1_ratio = 0.5


with mlflow.start_run():
    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    lr.fit(train_x, train_y)

    predicted_qualities = lr.predict(test_x)

    (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

    print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
    print("  RMSE: %s" % rmse)
    print("  MAE: %s" % mae)
    print("  R2: %s" % r2)

    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

    if tracking_url_type_store != "file":
        mlflow.sklearn.log_model(lr, "model", registered_model_name="ElasticnetWineModel2")
    else:
        mlflow.sklearn.log_model(lr, "model")

I definitely recommend to use git versioned MLflow projects instead of running code directly from jupyter because
MLflow model registry will keep the git commit hash used for the run which will help to reproduce the results.

MLOps

mlops diagram

Now I’d like to propose the process of building and deploying ML models.

Training

As described before the model is prepared and trained by the analyst which works in the Jupyter workspace and logs metrics and model to the MLflow tracking and model registry.

MLflow UI

Senior Analyst (currently the MLflow doesn’t support roles assignment) checks the model metrics and decides to promote it to Staging/Production stage in MLflow UI.

Model promotion

We will create additional application which will track the changes in the MLflow registry and initialize the deployment process.

The on each MLflow registry change the python application will check the database, prepare and commit k8s deployments and upload models artifacts to minio.

Because the applications commits the deployments to git repository we need to generate ssh keys:

ssh-keygen

and store them as a secrets:

kubectl create secret generic ssh-key-secret --from-file=id_rsa=./id_rsa --from-file=id_rsa.pub=./id_rsa.pub -n qooba

Now we can deploy the application:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflowwatch
  namespace: qooba
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflowwatch
      version: v1
  template:
    metadata:
      labels:
        app: mlflowwatch
        version: v1
    spec:
      containers:
      - image: qooba/mlflow:watchdog
        imagePullPolicy: IfNotPresent
        name: mlflowwatch
        command: ["/mlflow/start-watch.sh"]
        env:
        - name: GIT_REPO_URL
          value: ...
        - name: GIT_REPO_IP
          value: ...
        - name: BUCKET_NAME
          value: qooba
        - name: AWS_ACCESS_KEY_ID
          value: minio
        - name: AWS_SECRET_ACCESS_KEY
          value: minio123
        - name: MLFLOW_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        ports:
        - containerPort: 5000
        volumeMounts:
          - mountPath: /mlflow
            name: mlflow
          - mountPath: /dev/shm
            name: dshm
          - mountPath: /etc/ssh-key
            name: ssh-key-secret-volume
            readOnly: true
      volumes:
        - name: mlflow
          persistentVolumeClaim:
            claimName: mlflow
        - emptyDir:
            medium: Memory
          name: dshm
        - name: ssh-key-secret-volume
          secret:
            defaultMode: 256
            secretName: ssh-key-secret

start-watch.sh:

#!/bin/bash
watchmedo shell-command --patterns='*.db' --recursive --wait --command='/mlflow/watch.sh' /mlflow

watch.sh

#!/bin/bash
cd /mlflow

if [ ! -d "/root/.ssh" ] 
then
  cp -r /etc/ssh-key /root/.ssh
  chmod -R 700 /root/.ssh

  ssh-keygen -R $GIT_REPO_URL
  ssh-keygen -R $GIT_REPO_IP
  ssh-keygen -R $GIT_REPO_URL,$GIT_REPO_IP
  ssh-keyscan -H $GIT_REPO_URL,$GIT_REPO_IP >> ~/.ssh/known_hosts
  ssh-keyscan -H $GIT_REPO_IP >> ~/.ssh/known_hosts
  ssh-keyscan -H $GIT_REPO_URL >> ~/.ssh/known_hosts

  git config --global user.name "mlflowwatch"
  git config --global user.email "mlflowwatch@qooba.net"
  git branch --set-upstream-to=origin/master master

fi

python3 /mlflow/watch.py

git add .
git commit -a -m "mlflow autocommit"
git push origin master

watch.py:

import os
import jinja2
import sqlite3
from collections import defaultdict
import boto3
import botocore

class Watcher:

    def __init__(self):
        self._model_deployment=ModelDeployment()
        self._model_registry=ModelRegistry()
        self._model_store=ModelStore()


    def process(self):
        model_groups = self._model_registry.models_info()
        for model_name, models_data in model_groups.items():
            print(f'{model_name}:')
            for model_data in models_data:
                print(f"- stage: {model_data['stage']}")
                print(f"  path: {model_data['path']}")
                self._model_deployment.generate_deployment(model_name, model_data)
                self._model_store.upload_model(model_data)


class ModelDeployment:

    def __init__(self):
        self._create_dir('deployments')
        self._template=self._prepare_template()

    def generate_deployment(self, model_name, model_data):
        self._create_dir(f'deployments/{model_name}')
        stage = model_data['stage'].lower()
        path = model_data['path'].replace('/mlflow/mlruns','s3://qooba/mlflow')
        self._create_dir(f'deployments/{model_name}/{stage}')
        outputText = self._template.render(model=path)
        with open(f'deployments/{model_name}/{stage}/deployment.yaml','w') as f:
            f.write(outputText)

    def _create_dir(self, directory):    
        if not os.path.exists(directory):
            os.makedirs(directory)

    def _prepare_template(self):
        templateLoader = jinja2.FileSystemLoader(searchpath="./")
        templateEnv = jinja2.Environment(loader=templateLoader)
        return templateEnv.get_template("deployment.yaml")

class ModelRegistry:

    def __init__(self):
        self._conn = sqlite3.connect('/mlflow/mlflow.db')

    def models_info(self):
        models=self._conn.execute("SELECT distinct name, version, current_stage, source FROM model_versions where current_stage in ('Staging','Production') order by version desc;").fetchall()
        res=defaultdict(list)

        for s in models:
            res[s[0].lower()].append({"tag": str(s[1]), "stage": s[2], "path": s[3]})

        return dict(res)


class ModelStore:

    def __init__(self):
        self._bucket_name=os.environ['BUCKET_NAME']
        self._s3=self._create_s3_client()
        self._create_bucket(self._bucket_name)

    def upload_model(self, model_data):  
        path = model_data['path']
        s3_path = path.replace('/mlflow/mlruns','mlflow')
        try:
            self._s3.head_object(Bucket=self._bucket_name, Key=f'{s3_path}/MLmodel')
        except botocore.errorfactory.ClientError as e:
            files = [(f'{path}/{f}',f'{s3_path}/{f}') for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
            for file in files:
                self._s3.upload_file(file[0], self._bucket_name, file[1])

    def _create_s3_client(self):
        return boto3.client('s3',
                  aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
                  aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
                  endpoint_url=os.environ["MLFLOW_S3_ENDPOINT_URL"])

    def _create_bucket(self, bucket_name):
        try:
            self._s3.head_bucket(Bucket=bucket_name)
        except botocore.client.ClientError as e:
            self._s3.create_bucket(Bucket=bucket_name)


if __name__ == "__main__":
    Watcher().process()

The model deployments will be prepared using the template:
deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-t1
  namespace: qooba
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow-t1
      version: v1
  template:
    metadata:
      labels:
        app: mlflow-t1
        version: v1
    spec:
      containers:
      - image: qooba/mlflow:serving
        imagePullPolicy: IfNotPresent
        name: mlflow-t1
        env:
        - name: AWS_ACCESS_KEY_ID
          value: minio
        - name: AWS_SECRET_ACCESS_KEY
          value: minio123
        - name: MLFLOW_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        - name: MODEL
          value: {{model}}
        ports:
        - containerPort: 5000
        volumeMounts:
          - mountPath: /dev/shm
            name: dshm
      volumes:
        - emptyDir:
            medium: Memory
          name: dshm

If the model is promoted to the Staging/Production the process prepares the deployment yaml and uploads model to S3 store.

We will use minio as a S3 model store.

minio.yaml:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: minio
  namespace: qooba
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: minio-pv-claim
  namespace: qooba
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: managed-nfs-storage
  volumeMode: Filesystem
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: minio
    namespace: qooba
  name: minio
  namespace: qooba
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: minio
      namespace: qooba
  template:
    metadata:
      labels:
        app: minio
        namespace: qooba
    spec:
      serviceAccountName: minio
      containers:
      - args:
        - server
        - /data
        env:
        - name: MINIO_ACCESS_KEY
          value: minio
        - name: MINIO_SECRET_KEY
          value: minio123
        image: minio/minio:RELEASE.2018-02-09T22-40-05Z
        imagePullPolicy: IfNotPresent
        name: minio
        ports:
        - containerPort: 9000
          protocol: TCP
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /data
          name: data
          subPath: minio
      volumes:
      - name: data
        persistentVolumeClaim:
          claimName: minio-pv-claim
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: minio
    namespace: qooba
  name: minio
  namespace: qooba
spec:
  ports:
  - port: 9000
    protocol: TCP
    targetPort: 9000
  selector:
    app: minio
    namespace: qooba

ArgoCD

No it is time to setup ArgoCD which will sync the Git deployments changes with Kubernetes configuration and automatically deploy newly promoted models.

argocd

To deploy MLflow models we will use docker image

qooba/mlflow:serving

FROM continuumio/miniconda3
RUN pip install mlflow==1.11.0 cloudpickle==1.6.0 scikit-learn==0.23.2 gevent boto3
ENV GUNICORN_CMD_ARGS="--timeout 60 -k gevent"
WORKDIR /opt/mlflow
ENV PORT=5000
ENV WORKER_NUMBER=4
CMD mlflow models serve -m $MODEL -h 0.0.0.0 -p $PORT -w $WORKER_NUMBER --no-conda

and configuration:
mlflow.serving.yaml:

apiVersion: v1
kind: Service
metadata:
  name: mlflow-t1
  namespace: qooba
  labels:
    app: mlflow-t1
spec:
  ports:
  - name: http
    port: 5000
    targetPort: 5000
  selector:
    app: mlflow-t1
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-t1
  namespace: qooba
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow-t1
      version: v1
  template:
    metadata:
      labels:
        app: mlflow-t1
        version: v1
    spec:
      containers:
      - image: qooba/mlflow:serving
        imagePullPolicy: IfNotPresent
        name: mlflow-t1
        env:
        - name: AWS_ACCESS_KEY_ID
          value: minio
        - name: AWS_SECRET_ACCESS_KEY
          value: minio123
        - name: MLFLOW_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        - name: MODEL
          value: s3://qooba/mlflow/1/e0167f65abf4429b8c58f56b547fe514/artifacts/model
        ports:
        - containerPort: 5000
        volumeMounts:
          - mountPath: /dev/shm
            name: dshm
      volumes:
        - emptyDir:
            medium: Memory
          name: dshm
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: mlflow-t1
  namespace: qooba
spec:
  hosts:
  - "*"
  gateways:
  - qooba/mlflow-serving-gateway
  http:
  - match:
    - uri:
        prefix: /serving/qooba/t1
    rewrite:
        uri: /
    route:
    - destination:
        port:
          number: 5000
        host: mlflow-t1
---
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: mlflow-serving-gateway
  namespace: qooba
spec:
  selector:
    istio: ingressgateway
  servers:
  - hosts:
    - '*'
    port:
      name: http
      number: 5000
      protocol: HTTP

Each time new model is promoted the ArgoCD applies new deployment with the new model s3 path:

- name: MODEL
  value: s3://qooba/mlflow/1/e0167f65abf4429b8c58f56b547fe514/artifacts/model

Inference services

Finally we can access model externally and generate predictions. Please note that in article the model is deployed in the same k8s namespace (in real solution model will be deployed on the separate k8s cluster) thus to access the model I have to send authservice_session otherwise request will redirected to the dex login page.

import json
import requests
import getpass

authservice_session = getpass.getpass()

headers={
    'Cookie': f'authservice_session={authservice_session}',
    'Content-Type': 'application/json'
}

data={
    "columns": ["fixed acidity","volatile acidity","citric acid","residual sugar","chlorides","free sulfur dioxide","total sulfur dioxide","density","pH","sulphates","alcohol"],
    "data": [[7.4,0.7,0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4],
[7.8,0.88,0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8]]
}

url='http://qooba-ai:31382/serving/qooba/t1/invocations'
requests.post(url, headers=headers,data=json.dumps(data)).text

# Response: [5.576883967129615, 5.50664776916154]

AI Scissors – sharp cut with neural networks

scissors

Cutting photos background is one of the most tedious graphical task. In this article will show how to simplify it using neural networks.

I will use U^2-Net networks which are described in detail in the arxiv article and python library rembg to create ready to use drag and drop web application which you can use running docker image.

The project code is available on my github https://github.com/qooba/aiscissors
You can also use ready docker image: https://hub.docker.com/repository/docker/qooba/aiscissors

Before you will continue reading please watch quick introduction:

Neural network

To correctly remove the image background we need to select the most visually attractive objects in an image which is covered by Salient Object Detection (SOD). To connect a low memory and computation cost with competitive results against state of art methods the novel U^2-Net architecture will be used.

U-Net convolutional networks have characteristic U shape with symmetric encoder-decoder structure. At each encoding stage the feature maps are downsampled (torch.nn.MaxPool2d) and then upsampled at each decoding
stage (torch.nn.functional.upsample). Downsample features are transferred and concatenated with upsample features using residual connections.

U^2-Net network uses two-level nested U-structure where the main architecture is a U-Net like encoder-decoder and each stage contains residual U-block. Each residual U-block repeats donwsampling/upsampling procedures which are also connected using residual connections.

neural network architecture

Nested U-structure extracts and aggregates the features at each level and enables to capture local and global information from shallow and deep layers.

The U^2-Net architecture is precisely described in arxiv article. Moreover we can go through the pytorch model definition of U2NET and U2NETP.

Additionally the authors also shared the pretrained models: U2NET (176.3MB) and U2NETP (4.7 MB).

The lighter U2NETP version is only 4.7 MB thus it can be used in mobile applications.

Web application

The neural network is wrapped with rembg library which automatically download pretrained networks and gives simple python api. To simplify the usage I have decided to create drag and drop web application (https://github.com/qooba/aiscissors)

In the application you can drag and the drop the image and then compare image with and without background side by side.

web application

You can simply run the application using docker image:

docker run --name aiscissors -d -p 8000:8000 --rm -v $(pwd)/u2net_models:/root/.u2net qooba/aiscissors 

if you have GPU card you can use it:

docker run --gpus all  --name aiscissors -d -p 8000:8000 --rm -v $(pwd)/u2net_models:/root/.u2net qooba/aiscissors 

To use GPU additional nvidia drivers (included in the NVIDIA CUDA Toolkit) are needed.

When you run the container the pretrained models are downloaded thus I have mount local directory u2net_models to /root/.u2net to avoid download each time I run the container.

References

https://arxiv.org/pdf/2005.09007.pdf

https://github.com/NathanUA/U-2-Net

https://github.com/danielgatis/rembg

U2-Net: Going Deeper with Nested U-Structure for Salient Object Detection, Qin, Xuebin and Zhang, Zichen and Huang, Chenyang and Dehghan, Masood and Zaiane, Osmar and Jagersand, Martin Pattern Recognition 106 107404 (2020)

“Hey Google” with Rasa – complete CI/CD solution for multilingual chatbots

old phone

In this article I will show how to build the complete CI/CD solution for building, training and deploying multilingual chatbots.
I will use Rasa core framework, Gitlab pipelines, Minio and Redis to build simple two language google assistant.

The project code is available on my github https://github.com/qooba/heygoogle-with-rasa

Before you will continue reading please watch quick introduction:

Architecture

architecture diagram

The solution contains several components thus I will describe each of them.

Google actions

To build google assistant we need to create and configure the google action project.
google actions create

We will build our own nlu engine thus we will start with the blank project.
Then we need to install gactions CLI to manage project from command line.
To access your projects you need to authenticate using command:

gactions login

if you want you can create the project using templates:

gactions init

to simplify the tutorial I have included configuration in the repository. You will need to set your project id in settings.yaml and webhook configuration using your ngrok address.
Configuration can be deployed using command:

gactions push

Ngrok

As mentioned before for development purposes I have used the ngrok to proxy the traffic from public endpoint (used for webhook destination) to localhost:8081:

ngrok http 8081

NGINX with LuaJIT

Currently in google action project is not possible to set different webhook addresses for different languages thus I have used NGINX and LuaJIT to route the traffic to proper language container.
The information about language context is included in the request body which can be handled using Lua script:

server {
        listen 80;
        resolver 127.0.0.11 ipv6=off;
        location / {
            set $target '';
            access_by_lua '
                local cjson = require("cjson")
                ngx.req.read_body()
                local text = ngx.var.request_body
                local value = cjson.new().decode(text)
                local lang = string.sub(value["user"]["locale"],1,2)
                ngx.var.target = "http://heygoogle-" .. lang
            ';
            proxy_pass $target;
        }
    }

Rasa application

The rasa core is one of the famous framework for building chatbots. I have decided to create separate docker container for each language which gives flexibility in terms of scalability and deployment.
Dockerfile (development version with watchdog) for rasa application (qooba/rasa:1.10.10_app):

FROM rasa/rasa:1.10.10
USER root
RUN pip3 install python-jose watchdog[watchmedo]
ENTRYPOINT watchmedo auto-restart -d . -p '*.py' --recursive -- python3 app.py

Using default rasa engine you have to restart the container when you want to deploy new retrained model thus I have decided to wrap it with simple python application which additionally listen the redis PubSub topic and waits for event which automatically reloads the model without restarting the whole application. Additionally there are separate topics for different languages thus we can simply deploy and reload model for specific language.

Redis

In this solution the redis has two responsibilities:
* EventBus – as mentioned above chatbot app listen events sent from GitLab pipeline worker.
* Session Store – which keeps the conversations state thus we can simply scale the chatbots

We can simply run Redis using command:

docker run --name redis -d --rm --network gitlab redis

Minio

Minio is used as a Rasa Model Store (Rasa supports the S3 protocol). The GitLab pipeline worker after model training uploads the model package to Minio. Each language has separate bucket:

model store

To run minio we will use command (for whole solution setup use run.sh where environment variables are set) :

docker run -d --rm -p 9000:9000 --network gitlab --name minio \
  -e "MINIO_ACCESS_KEY=$MINIO_ACCESS_KEY" \
  -e "MINIO_SECRET_KEY=$MINIO_SECRET_KEY" \
  -v $(pwd)/minio/data:/data \
  minio/minio server /data

Gitlab pipelines

In this solution I have used the gitlab as a git repository and CI/CD engine.
You can simply run the GitLab locally using gitlab docker image:

docker run -d --rm -p 80:80 -p 8022:22 -p 443:443 --name gitlab --network gitlab \
  --hostname gitlab \
  -v $(pwd)/gitlab/config:/etc/gitlab:Z \
  -v $(pwd)/gitlab/logs:/var/log/gitlab:Z \
  -v $(pwd)/gitlab/data:/var/opt/gitlab:Z \
  gitlab/gitlab-ce:latest

Notice that I have used gitlab hostname (without this pipelines does not work correctly on localhost) thus you will need to add appropriate entry to /etc/hosts:

127.0.1.1   gitlab

Now you can create new project (in my case I called it heygoogle).
Likely you already use 22 port thus for ssh I used 8022.
You can clone the project using command (remember to setup ssh keys):

git clone ssh://git@localhost:8022/root/heygoogle.git

Before you can use the gitlab runner you have to configure at least one worker.
First you get registration token (Settings -> CI/CD -> Runners):

gitlab runners

and run once:

docker run --rm --network gitlab -v /srv/gitlab-runner/config:/etc/gitlab-runner gitlab/gitlab-runner register \
  --non-interactive \
  --docker-network-mode gitlab \
  --executor "docker" \
  --docker-image ubuntu:latest \
  --url "http://gitlab/" \
  --registration-token "TWJABbyzkVWVAbJc9bSx" \
  --description "docker-runner" \
  --tag-list "docker,aws" \
  --run-untagged="true" \
  --locked="false" \
  --access-level="not_protecte

Now you can run the gitlab-runner container:

docker run -d --rm --name gitlab-runner --network gitlab \
     -v /srv/gitlab-runner/config:/etc/gitlab-runner \
     -v /var/run/docker.sock:/var/run/docker.sock \
     gitlab/gitlab-runner:latest

To create pipeline you simply commit the .gitlab-ci.yml into your repository.
In our case it contains two steps (one for each language):

variables:
  MINIO_ACCESS_KEY: AKIAIOSFODNN7EXAMPLE
  MINIO_SECRET_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  PROJECT_NAME: heygoogle

stages:
  - process_en
  - process_pl

step-1:
  image: qooba/rasa:1.10.10
  stage: process_en
  script:
    - ./pipeline.sh en
  interruptible: true

step-2:
  image: qooba/rasa:1.10.10
  stage: process_pl
  script:
    - ./pipeline.sh pl
  interruptible: true

Gitlab pipeline steps use qooba/rasa:1.10.10 docker image:

FROM rasa/rasa:1.10.10
USER root
RUN apt update && apt install git -yq
ENTRYPOINT /bin/bash

thus they have complete rasa environment.

The pipeline.sh script:

#!/bin/bash

lang=$1
echo "Processing $lang"

if (($(git diff-tree --no-commit-id --name-only -r $CI_COMMIT_SHA | grep ^$lang/ | wc -l) > 0)); then
   echo "Training $lang"
   cd $lang
   rasa train
   rasa test
   cd ..
   python3 pipeline.py --language $lang
else
   echo

checks if something have changed in chosen language directory, trains and tests the model
and finally uploads trained model to Minio and publish event to Redis using
pipeline.py:

import os
import boto3
import redis
from botocore.client import Config

def upload_model(project_name: str, language: str, model: str):
    s3=boto3.resource("s3",endpoint_url="http://minio:9000",
        aws_access_key_id=os.environ["MINIO_ACCESS_KEY"],
        aws_secret_access_key=os.environ["MINIO_SECRET_KEY"],
        config=Config(signature_version="s3v4"),region_name="us-east-1")
    bucket_name=f'{project_name}-{language}'
    print(f"BUCKET NAME: {bucket_name}") 
    bucket_exists=s3.Bucket(bucket_name) in s3.buckets.all() or s3.create_bucket(Bucket=bucket_name)
    s3.Bucket(bucket_name).upload_file(f"/builds/root/{project_name}/{language}/models/{model}",model)


def publish_event(project_name: str, language: str, model: str):
    topic_name=f'{project_name}-{language}'
    print(f"TOPIC NAME: {topic_name}") 
    client=redis.Redis(host="redis", port=6379, db=0);
    client.publish(topic_name, model)

if __name__ == '__main__':
    import argparse

    project_name=os.environ["PROJECT_NAME"]

    parser = argparse.ArgumentParser(description='Create a ArcHydro schema')
    parser.add_argument('--language', metavar='path', required=True,
                        help='the model language')

    args = parser.parse_args()
    model=os.listdir(f"/builds/root/{project_name}/{args.language}/models/")[0]
    print("Uploading model")
    upload_model(project_name=project_name, language=args.language, model=model)

    print("Publishing event")
    publish_event(project_name=project_name, language=args.language, model=model)

Now after each change in the repository the gitlab starts the pipeline run:
gitlab pipeline

gitlab step

Summary

We have built complete solution for creating, training, testing and deploying the chatbots.
Additionally the solution supports multi language chatbots keeping scalability and deployment flexibility.
Moreover trained models can be continuously deployed without chatbot downtime (for Kubernetes environments
the Canary Deployment could be another solution).
Finally we have integrated solution with the google actions and created simple chatbot.