こすたろーんエンジニアの試行錯誤部屋

作成物の備忘録を書いていきますー

【mlops】kedroを使ってみた

スポンサーリンク

これまでmlops関係のモジュールをいくつか試してみました
今回はkedroの導入までやってみます

目次

スポンサーリンク

この記事でわかること

kedroの導入方法

1.実行環境

Jetson Xavier NX
ubuntu18.04
Python 3.6.9
docker

2. インストール

以下をコマンドでインストールできます

pip install kedro

インストールの確認は以下コマンドで行えます

kedro info

 _            _
| | _____  __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
|   <  __/ (_| | | | (_) |
|_|\_\___|\__,_|_|  \___/
v0.18.14

3. コード作成

3.1 nodeの作成

pythonで関数を記述して、nodeにfunc,input,outputを登録します
今回のサンプルコードでは以下のようになってます
func:return_greeting
|- input:なし
|- output:my_salutation

func:join_statements
|- input:greeting
|- output:my_message

from kedro.pipeline import node

# Prepare first node
def return_greeting():
    return "Hello"

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"

return_greeting_node = node(func=return_greeting, inputs=None, outputs="my_salutation")

join_statements_node = node(
    join_statements, inputs="my_salutation", outputs="my_message"
)

3.2 piplineの作成

piplineではnodeの依存関係と実行順序を記述します

from kedro.pipeline import pipeline

# Assemble nodes into a pipeline
greeting_pipeline = pipeline([return_greeting_node, join_statements_node])

3.3 DataCatalog

DataCatalogは様々なデータ形式に対応したモジュールです
今回は単純にmy_salutationの変数をいれる箱として定義します

from kedro.io import DataCatalog, MemoryDataSet

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

3.4 runnerの作成

最後にpiplineを実行するrunnerを作成します

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(greeting_pipeline, data_catalog))

4.実行

4.1 完成したコード

"""Contents of hello_kedro.py"""
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, pipeline
from kedro.runner import SequentialRunner

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

# Prepare first node
def return_greeting():
    return "Hello"


return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"


join_statements_node = node(
    join_statements, inputs="my_salutation", outputs="my_message"
)

# Assemble nodes into a pipeline
greeting_pipeline = pipeline([return_greeting_node, join_statements_node])

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(greeting_pipeline, data_catalog))

4.2 実行コマンド

python hello_kedro.py

実行結果は以下のようになります

INFO     Running node: return_greeting(None) -> [my_salutation] 
INFO     Saving data to 'my_salutation' (MemoryDataset)... 
INFO     Completed 1 out of 2 tasks 
INFO     Loading data from 'my_salutation' (MemoryDataset)... 
INFO     Running node: join_statements([my_salutation]) -> [my_message] 
INFO     Saving data to 'my_message' (MemoryDataset)... 
INFO     Completed 2 out of 2 tasks 
INFO     Pipeline execution completed successfully.  
INFO     Loading data from 'my_message' (MemoryDataset)... 
{'my_message': 'Hello Kedro!'}

スポンサーリンク

参考

docs.kedro.org