コンテンツにスキップ

ソースの利用(複数テーブル一括ロード)

概要

create_estat_sourceを使用して、複数の統計表を一括でロードする方法を説明します。EstatDltConfigのリストを渡すことで、テーブルごとにリソースを持つdlt sourceを生成できます。

利用シーン

  • 複数の統計表をまとめてロードしたい場合
  • テーブルごとに異なる書き込み戦略(merge / replace / append)を指定したい場合
  • dlt sourceとしてまとめて1つのパイプラインで実行したい場合

必要な準備

  1. e-Stat APIキーの設定

    export ESTAT_API_KEY="your-api-key-here"
    

  2. 必要なパッケージのインストール

    pip install estat-api-dlt-helper duckdb
    

コード例

import os

import dlt

from estat_api_dlt_helper import EstatDltConfig, create_estat_source


def main():
    """Load multiple e-Stat tables using create_estat_source."""
    app_id = os.getenv("ESTAT_API_KEY", "")

    configs = [
        EstatDltConfig(
            source={
                "app_id": app_id,
                "statsDataId": "0000020201",
                "limit": 100,
                "maximum_offset": 200,
            },
            destination={
                "destination": "duckdb",
                "dataset_name": "estat_multi",
                "table_name": "population",
                "write_disposition": "merge",
                "primary_key": ["time", "area", "cat01"],
            },
        ),
        EstatDltConfig(
            source={
                "app_id": app_id,
                "statsDataId": "0000020202",
                "limit": 100,
                "maximum_offset": 200,
            },
            destination={
                "destination": "duckdb",
                "dataset_name": "estat_multi",
                "table_name": "household",
                "write_disposition": "replace",
                "primary_key": None,
            },
        ),
    ]

    source = create_estat_source(configs)

    print(f"Source name: {source.name}")
    print(f"Resources: {list(source.resources.keys())}")

    # Create pipeline and run
    pipeline = dlt.pipeline(
        pipeline_name="estat_source_demo",
        destination="duckdb",
        dataset_name="estat_multi",
    )

    print("\nRunning pipeline...")
    info = pipeline.run(source)

    print("\nLoad completed!")
    print(f"Load info: {info}")


if __name__ == "__main__":
    main()

create_estat_source のパラメータ

パラメータ 説明
configs EstatDltConfigのリスト(各要素がリソースに対応) [EstatDltConfig(...), ...]
**source_kwargs dlt.sourceに渡す追加キーワード引数 name="my_source"

注意事項

EstatDltConfigtable_nameはソース内で一意である必要があります。同じtable_nameを複数の設定で使用するとValueErrorが発生します。

# NG: table_nameが重複している
configs = [
    EstatDltConfig(
        source={"app_id": app_id, "statsDataId": "0000020201"},
        destination={"destination": "duckdb", "dataset_name": "estat", "table_name": "stats"},
    ),
    EstatDltConfig(
        source={"app_id": app_id, "statsDataId": "0000020202"},
        destination={"destination": "duckdb", "dataset_name": "estat", "table_name": "stats"},  # 重複
    ),
]
# ValueError: Duplicate table names found: ['stats']

バリデーション

create_estat_sourceは以下の入力バリデーションを行います。

  • configsが空リストの場合、ValueErrorを送出します
  • 複数の設定で同じtable_nameが指定されている場合、ValueErrorを送出します

実行結果の例

Source name: estat_source
Resources: ['population', 'household']

Running pipeline...

Load completed!
Load info: ...

次のステップ