スポンサーリンク
これまでmlops関係のモジュールをいくつか試してみました
今回はkedroの導入までやってみます
目次
スポンサーリンク
この記事でわかること
kedroの導入方法
1.実行環境
Jetson Xavier NX
ubuntu18.04
Python 3.6.9
docker
2. インストール
以下をコマンドでインストールできます
pip install kedro
インストールの確認は以下コマンドで行えます
kedro info _ _ | | _____ __| |_ __ ___ | |/ / _ \/ _` | '__/ _ \ | < __/ (_| | | | (_) | |_|\_\___|\__,_|_| \___/ v0.18.14
3. コード作成
3.1 nodeの作成
pythonで関数を記述して、nodeにfunc,input,outputを登録します
今回のサンプルコードでは以下のようになってます
func:return_greeting
|- input:なし
|- output:my_salutation
func:join_statements
|- input:greeting
|- output:my_message
from kedro.pipeline import node # Prepare first node def return_greeting(): return "Hello" # Prepare second node def join_statements(greeting): return f"{greeting} Kedro!" return_greeting_node = node(func=return_greeting, inputs=None, outputs="my_salutation") join_statements_node = node( join_statements, inputs="my_salutation", outputs="my_message" )
3.2 piplineの作成
piplineではnodeの依存関係と実行順序を記述します
from kedro.pipeline import pipeline # Assemble nodes into a pipeline greeting_pipeline = pipeline([return_greeting_node, join_statements_node])
3.3 DataCatalog
DataCatalogは様々なデータ形式に対応したモジュールです
今回は単純にmy_salutationの変数をいれる箱として定義します
from kedro.io import DataCatalog, MemoryDataSet # Prepare a data catalog data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
3.4 runnerの作成
最後にpiplineを実行するrunnerを作成します
# Create a runner to run the pipeline runner = SequentialRunner() # Run the pipeline print(runner.run(greeting_pipeline, data_catalog))
4.実行
4.1 完成したコード
"""Contents of hello_kedro.py""" from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import node, pipeline from kedro.runner import SequentialRunner # Prepare a data catalog data_catalog = DataCatalog({"my_salutation": MemoryDataSet()}) # Prepare first node def return_greeting(): return "Hello" return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation") # Prepare second node def join_statements(greeting): return f"{greeting} Kedro!" join_statements_node = node( join_statements, inputs="my_salutation", outputs="my_message" ) # Assemble nodes into a pipeline greeting_pipeline = pipeline([return_greeting_node, join_statements_node]) # Create a runner to run the pipeline runner = SequentialRunner() # Run the pipeline print(runner.run(greeting_pipeline, data_catalog))
4.2 実行コマンド
python hello_kedro.py
実行結果は以下のようになります
INFO Running node: return_greeting(None) -> [my_salutation] INFO Saving data to 'my_salutation' (MemoryDataset)... INFO Completed 1 out of 2 tasks INFO Loading data from 'my_salutation' (MemoryDataset)... INFO Running node: join_statements([my_salutation]) -> [my_message] INFO Saving data to 'my_message' (MemoryDataset)... INFO Completed 2 out of 2 tasks INFO Pipeline execution completed successfully. INFO Loading data from 'my_message' (MemoryDataset)... {'my_message': 'Hello Kedro!'}
スポンサーリンク