Distributed Feature Store with Feast and Dask

bubbles

In this article I will show how we combine Feast and Dask library to create distributed feature store.

Before you will continue reading please watch short introduction:

The Feature Store is very important component of the MLops process which helps to manage historical and online features. With the Feast we can for example read historical features from the parquet files and then materialize them to the Redis as a online store.

But what to do if historical data size exceeds our machine capabilities ? The Dask library can help to solve this problem. Using Dask we can distribute the data and calculations across multiple machines. The Dask can be run on the single machine or on the cluster (k8s, yarn, cloud, HPC, SSH, manual setup). We can start with the single machine and then smoothly pass to the cluster if needed. Moreover thanks to the Dask we can read bunch of parquets using path pattern and evaluate distributed training using libraries like scikit-learn or XGBoost

Feast with Dask

I have prepared ready to use docker image thus you can simply reproduce all steps.

docker run --name feast -d --rm -p 8888:8888 -p 8787:8787 qooba/feast:dask

Then check the Jupyter notebook token which you will need to login:

docker logs -f feast

And open (use the token to login):

http://localhost:8888/notebooks/feast-dask/feast-dask.ipynb#/slide-0-0

The notebook is also available on https://github.com/qooba/feast-dask/blob/main/docker/feast-dask.ipynb.

But with the docker you will have the whole environment ready.

In the notebook you will can find all the steps:

Random data generation

I have used numpy and scikit-learn to generate 1M entities end historical data (10 features generated with make_hastie_10_2 function) for 14 days which I save as a parquet file (1.34GB).

Feast configuration and registry

feature_store.yaml – where I use local registry and Sqlite database as a online store.

features.py – with one file source (generate parquet) and features definition.

The create the Feast registry we have to run:

feast apply

Additionally I have created simple library which helps to inspect feast schema directly in the Jupyter notebook

pip install feast-schema
from feast_schema import FeastSchema

FeastSchema('.').show_schema()

Feast schema

Dask cluster setup

Then I setup simple Dask cluster with scheduler and 4 workers.

dask-scheduler --host 0.0.0.0 --port 8786 --bokeh-port 8787 &

dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8701 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8702 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8703 &
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8704 &

The Dask dashboard is exposed on port 8787 thus you can follow Dask metrics on:

http://localhost:8787/status

Dask dashboard

Fetching historical features

In the next step I have fetched the historical features using Feast with the Dask:

from feast import FeatureStore

store = FeatureStore(repo_path='.')
training_df = store.get_historical_features(
    entity_df=entity_df,
    feature_refs=[
        "my_statistics:f0",
        "my_statistics:f1",
        "my_statistics:f2",
        "my_statistics:f3",
        "my_statistics:f4",
        "my_statistics:f5",
        "my_statistics:f6",
        "my_statistics:f7",
        "my_statistics:f8",
        "my_statistics:f9",
        "my_statistics:y",
    ],
).to_df()
training_df

this takes about 14 seconds and is much more faster than Feast without the Dask.

Pandas
CPU times: user 2min 51s, sys: 6.64 s, total: 2min 57s
Wall time: 2min 52s

Dask
CPU times: user 458 ms, sys: 65.3 ms, total: 524 ms
Wall time: 14.7 s

Distributed training with Sklearn

After fetching the data we can start with the training. We can used fetched Pandas dataframe but we can also fetch Dask dataframe instead:

from feast import FeatureStore
store=FeatureStore(repo_path='.')
training_dd = store.get_historical_features(
    entity_df=entity_df,
    feature_refs=[
        "my_statistics:f0",
        "my_statistics:f1",
        "my_statistics:f2",
        "my_statistics:f3",
        "my_statistics:f4",
        "my_statistics:f5",
        "my_statistics:f6",
        "my_statistics:f7",
        "my_statistics:f8",
        "my_statistics:f9",
        "my_statistics:y",
    ]
).evaluation_function()

Using Dask dataframe we can continue distributed training with the distributed data.
On the other hand if we will use Pandas dataframe the data will be computed to the one node.

To start distributed training with scikit-learn we can use Joblib library with the dask backend:

import joblib
from sklearn.ensemble import GradientBoostingClassifier
from dask_ml.model_selection import train_test_split

predictors = training_dd[["f0","f1","f2","f3","f4","f5","f6","f7","f8","f9"]]
targets = training_dd[["y"]]

X_train, X_test, y_train, y_test = train_test_split(predictors, targets, test_size=.3)

with joblib.parallel_backend('dask'):
    clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0, max_depth=1, random_state=0, verbose=1).fit(X_train, y_train)

    score=clf.score(X_test, y_test)

score

Online features materialization

Finally I have materialized the data to the local Sqlite database:

feast materialize 2021-01-01T01:00:00 2021-01-31T23:59:00

In this case the materialization data is also prepared using Dask.

Feast with AI – feed your MLflow models with feature store

feast

In this article I will show how to prepare complete MLOPS solution based on the Feast feature store and MLflow platform.

Before you will continue reading please watch short introduction:

The whole solution will be deployed on the kubernetes (mlflow_feast.yaml).

component

We will use:
* Feast – as a Feature Store
* MLflow – as model repository
* Minio – as a S3 storage
* Jupyter notebook – as a workspace
* Redis – for a online features store

propensity to buy

To better visualize the whole process we will use the Propensity to buy example where I base on the Kaggle examples and data.

mlops

We start in Jupyter Notebook where we prepare Feast feature store schema which is kept in S3.

We can simply inspect the Feast schema in Jupyter Notebook:

from feast import FeatureStore
from IPython.core.display import display, HTML
import json
from json2html import *
import warnings
warnings.filterwarnings('ignore')

class FeastSchema:
    def __init__(self, repo_path: str):
        self.store = FeatureStore(repo_path=repo_path)

    def show_schema(self, skip_meta: bool= False):
        feast_schema=self.__project_show_schema(skip_meta)        
        display(HTML(json2html.convert(json = feast_schema)))

    def show_table_schema(self, table: str, skip_meta: bool= False):
        feasture_tables_dictionary=self.__project_show_schema(skip_meta)
        display(HTML(json2html.convert(json = {table:feasture_tables_dictionary[table]})))

    def __project_show_schema(self, skip_meta: bool= False):
        entities_dictionary={}
        feast_entities=self.store.list_entities()
        for entity in feast_entities:
            entity_dictionary=entity.to_dict()
            entity_spec=entity_dictionary['spec']
            entities_dictionary[entity_spec['name']]=entity_spec

        feasture_tables_dictionary={}
        feast_feature_tables=self.store.list_feature_views()
        for feature_table in feast_feature_tables:
            feature_table_dict=json.loads(str(feature_table))
            feature_table_spec=feature_table_dict['spec']
            feature_table_name=feature_table_spec['name']
            feature_table_spec.pop('name',None)
            if 'entities' in feature_table_spec:
                feature_table_entities=[]
                for entity in feature_table_spec['entities']:
                    feature_table_entities.append(entities_dictionary[entity])
                feature_table_spec['entities']=feature_table_entities

            if not skip_meta:
                feature_table_spec['meta']=feature_table_dict['meta']
            else:
                feature_table_spec.pop('input',None)
                feature_table_spec.pop('ttl',None)
                feature_table_spec.pop('online',None)

            feasture_tables_dictionary[feature_table_name]=feature_table_spec

        return feasture_tables_dictionary




FeastSchema(".").show_schema()
#FeastSchema(".").show_schema(skip_meta=True)
#FeastSchema(".").show_table_schema('driver_hourly_stats')
#FeastSchema().show_tables()

In our case we store the data in Apache Parquet files in S3 bucket.
Using the Feast we can fetch the historical features and train the model using Scikit-learn library

bucket_name="propensity"
filename="training_sample"

store = FeatureStore(repo_path=".")

s3 = fs.S3FileSystem(endpoint_override=os.environ.get("FEAST_S3_ENDPOINT_URL"))
entity_df=pd.read_parquet(f'{bucket_name}/{filename}_entities.parquet', filesystem=s3)
entity_df["event_timestamp"]=datetime.now()


training_df = store.get_historical_features(
    entity_df=entity_df, 
    feature_refs = [
        'propensity_data:basket_icon_click',
        'propensity_data:basket_add_list',
        'propensity_data:basket_add_detail',
        'propensity_data:sort_by',
        'propensity_data:image_picker',
        'propensity_data:account_page_click',
        'propensity_data:promo_banner_click',
        'propensity_data:detail_wishlist_add',
        'propensity_data:list_size_dropdown',
        'propensity_data:closed_minibasket_click',
        'propensity_data:checked_delivery_detail',
        'propensity_data:checked_returns_detail',
        'propensity_data:sign_in',
        'propensity_data:saw_checkout',
        'propensity_data:saw_sizecharts',
        'propensity_data:saw_delivery',
        'propensity_data:saw_account_upgrade',
        'propensity_data:saw_homepage',
        'propensity_data:device_mobile',
        'propensity_data:device_computer',
        'propensity_data:device_tablet',
        'propensity_data:returning_user',
        'propensity_data:loc_uk',
        'propensity_data:ordered'
    ],
).to_df()

predictors = training_df.drop(['propensity_data__ordered','UserID','event_timestamp'], axis=1)
targets = training_df['propensity_data__ordered']

X_train, X_test, y_train, y_test = train_test_split(predictors, targets, test_size=.3)

classifier=GaussianNB(var_smoothing=input_params['var_smoothing'])
classifier=classifier.fit(X_train,y_train)

predictions=classifier.predict(X_test)

conf_matrix=sklearn.metrics.confusion_matrix(y_test,predictions)
ac_score=sklearn.metrics.accuracy_score(y_test, predictions)

propensity_model_path = 'propensity.joblib'
joblib.dump(classifier, propensity_model_path)

artifacts = {
    "propensity_model": propensity_model_path,
    "feature_store": "feature_store.yaml"
}

The model will use online Feast redis features as well as additional features from the request thus we need to wrap the MLflow model and define it:

import mlflow.pyfunc
class PropensityWrapper(mlflow.pyfunc.PythonModel):

    def load_context(self, context):
        import joblib
        from feast import FeatureStore
        import pandas as pd 
        import os

        self.model = joblib.load(context.artifacts["propensity_model"])
        self.store = FeatureStore(repo_path=os.environ.get("FEAST_REPO_PATH"))

    def predict(self, context, model_input):
        users=list(model_input.to_dict()["UserID"].values())

        feature_vector = self.store.get_online_features(
            feature_refs=[
                'propensity_data:basket_icon_click',
                'propensity_data:basket_add_list',
                'propensity_data:basket_add_detail',
                'propensity_data:sort_by',
                'propensity_data:image_picker',
                'propensity_data:account_page_click',
                'propensity_data:promo_banner_click',
                'propensity_data:detail_wishlist_add',
                'propensity_data:list_size_dropdown',
                'propensity_data:closed_minibasket_click',
                'propensity_data:checked_delivery_detail',
                'propensity_data:checked_returns_detail',
                'propensity_data:sign_in',
                'propensity_data:saw_checkout',
                'propensity_data:saw_sizecharts',
                'propensity_data:saw_delivery',
                'propensity_data:saw_account_upgrade',
                'propensity_data:saw_homepage',
                'propensity_data:returning_user',
                'propensity_data:loc_uk'
            ],
            entity_rows=[{"UserID": uid} for uid in users]
        ).to_dict()

        data=pd.DataFrame.from_dict(feature_vector)
        merged_data = pd.merge(model_input,data, how="inner", on=["UserID"], suffixes=('_x', '')).drop(['UserID'], axis=1)
        return self.model.predict(merged_data)

Now we can log the MLflow model to the repository:

import warnings
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
import mlflow.sklearn
import mlflow.pyfunc

#conda_env=mlflow.pyfunc.get_default_conda_env()

with mlflow.start_run():

    #mlflow.log_param("var_smoothing", input_params['var_smoothing'])
    mlflow.log_metric("accuracy_score", ac_score)

    tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

    if tracking_url_type_store != "file":
        mlflow.pyfunc.log_model("model",
                                 registered_model_name="propensity_model",
                                 python_model=PropensityWrapper(),
                                 artifacts=artifacts,
                                 conda_env=conda_env)
    else:
        mlflow.pyfunc.log_model("model",
                                 path=my_model_path,
                                 python_model=PropensityWrapper(),
                                 artifacts=artifacts,
                                 conda_env=conda_env)

We can export the code and run is using MLflow cli:

mlflow run . --no-conda --experiment-name="propensity" -P var_smoothing=1e-9

Now we need to materialize features to Redis:

feast materialize 2021-03-22T23:42:00 2021-06-22T23:42:00

Using MLflow we can simply deploy model as a microservice in k8s.
In our case we want to deploy the model models:/propensity_model/Production
which is currently assigned for Production. During start the MLflow will automatically fetch the proper model from S3:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-serving
  namespace: qooba
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow-serving
      version: v1
  template:
    metadata:
      labels:
        app: mlflow-serving
        version: v1
    spec:
      containers:
      - image: qooba/mlflow:serving
        imagePullPolicy: IfNotPresent
        name: mlflow-serving
        env:
        - name: MLFLOW_TRACKING_URI
          value: http://mlflow.qooba.svc.cluster.local:5000
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: minio-auth
              key: username
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: minio-auth
              key: password
        - name: MLFLOW_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        - name: FEAST_S3_ENDPOINT_URL
          value: http://minio.qooba.svc.cluster.local:9000
        - name: REDIS_TYPE
          value: REDIS
        - name: REDIS_CONNECTION_STRING
          value: redis.qooba.svc.cluster.local:6379,db=0
        - name: FEAST_TELEMETRY
          value: "false"
        - name: FEAST_REPO_PATH
          value: /feast_repository
        - name: PORT
          value: "5000"
        - name: MODEL
          value: models:/propensity_model/Production
        ports:
        - containerPort: 5000
        volumeMounts:
          - mountPath: /feast_repository
            name: config
      volumes:
        - name: config
          configMap:
            name: mlflow-serving
            items:
            - key: feature_store
              path: feature_store.yaml

On each HTTP request:

import requests
import json

url="http://mlflow-serving.qooba.svc.cluster.local:5000/invocations"

headers={
    'Content-Type': 'application/json; format=pandas-records'
}

data=[
    {"UserID": "a720-6b732349-a720-4862-bd21-644732",
     'propensity_data:device_mobile': 1.0,
     'propensity_data:device_computer': 0.0,
     'propensity_data:device_tablet': 0.0
    }
]

response=requests.post(url, data=json.dumps(data), headers=headers)
response.text

The model will fetch the client features (based on UserID) from Redis and HTTP request and generate prediction.