コンテンツにスキップ

APIリファレンス

メイン設定クラス

EstatDltConfig

e-Stat APIからdltへのデータ統合のメイン設定クラスです。ソースとロード先の設定を組み合わせ、データ抽出とロードのための追加処理オプションを提供します。

Bases: BaseModel

Main configuration for e-Stat API to DLT integration.

Combines source and destination configurations with additional processing options for data extraction and loading.

Attributes:

Name Type Description
source SourceConfig

e-Stat API source configuration.

destination DestinationConfig

DLT destination configuration.

batch_size Optional[int]

Number of records per batch.

max_retries int

Maximum API retry attempts.

timeout Optional[int]

API request timeout in seconds.

Source code in src/estat_api_dlt_helper/config/models.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class EstatDltConfig(BaseModel):
    """Main configuration for e-Stat API to DLT integration.

    Combines source and destination configurations with additional
    processing options for data extraction and loading.

    Attributes:
        source: e-Stat API source configuration.
        destination: DLT destination configuration.
        batch_size: Number of records per batch.
        max_retries: Maximum API retry attempts.
        timeout: API request timeout in seconds.
    """

    source: SourceConfig = Field(..., description="e-Stat API source configuration")
    destination: DestinationConfig = Field(
        ..., description="DLT destination configuration"
    )

    # Optional processing configuration
    batch_size: Optional[int] = Field(
        None, description="Number of records to process in each batch"
    )
    max_retries: int = Field(3, description="Maximum number of API retry attempts")
    timeout: Optional[int] = Field(None, description="API request timeout in seconds")

    # Data transformation options
    flatten_metadata: bool = Field(
        False, description="Whether to flatten metadata into table columns"
    )
    include_api_metadata: bool = Field(
        True, description="Whether to include API response metadata in the table"
    )

    model_config = ConfigDict(
        validate_assignment=True,
        extra="forbid",
    )

SourceConfig

e-Stat APIデータソースの設定クラスです。認証、取得する統計表選択、各種オプションを含む統計データの取得パラメータを定義します。

パラメータの詳細はe_Stat API 仕様を参照のこと。

Bases: BaseModel

Configuration for e-Stat API data source.

Defines parameters for fetching statistical data from e-Stat API, including authentication, data selection, and pagination options.

Attributes:

Name Type Description
app_id str

e-Stat API application ID for authentication.

statsDataId Union[str, List[str]]

Statistical table ID(s) to fetch.

lang Literal['J', 'E']

Language for API response (J: Japanese, E: English).

metaGetFlg Literal['Y', 'N']

Whether to fetch metadata.

cntGetFlg Literal['Y', 'N']

Whether to fetch only record count.

Source code in src/estat_api_dlt_helper/config/models.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class SourceConfig(BaseModel):
    """Configuration for e-Stat API data source.

    Defines parameters for fetching statistical data from e-Stat API,
    including authentication, data selection, and pagination options.

    Attributes:
        app_id: e-Stat API application ID for authentication.
        statsDataId: Statistical table ID(s) to fetch.
        lang: Language for API response (J: Japanese, E: English).
        metaGetFlg: Whether to fetch metadata.
        cntGetFlg: Whether to fetch only record count.
    """

    app_id: str = Field(..., description="e-Stat API application ID (API key)")
    statsDataId: Union[str, List[str]] = Field(
        ...,
        description="Statistical table ID(s) to fetch. Can be a single ID or list of IDs",
    )
    lang: Literal["J", "E"] = Field("J", description="Language of the API response")
    metaGetFlg: Literal["Y", "N"] = Field(
        "Y", description="Whether to fetch metadata (Y/N)"
    )
    cntGetFlg: Literal["Y", "N"] = Field(
        "N", description="Whether to fetch only count (Y/N)"
    )

    # 以下オプションパラメータ
    explanationGetFlg: Optional[Literal["Y", "N"]] = Field(
        "Y", description="Whether to fetch explanations (Y/N)"
    )
    annotationGetFlg: Optional[Literal["Y", "N"]] = Field(
        "Y", description="Whether to fetch annotations (Y/N)"
    )
    replaceSpChars: Literal["0", "1", "2", "3"] = Field(
        "2",
        description="Special character replacement mode | 0: 置換しない, 1: 置換する, 2: NULL, 3: 'NA'",
    )

    # データ選択パラメータ
    lvTab: Optional[str] = Field(None, description="Table level")
    cdTab: Optional[str] = Field(None, description="Table code")
    cdTabFrom: Optional[str] = Field(None, description="Table code from")
    cdTabTo: Optional[str] = Field(None, description="Table code to")
    lvTime: Optional[str] = Field(None, description="Time level")
    cdTime: Optional[str] = Field(None, description="Time code")
    cdTimeFrom: Optional[str] = Field(None, description="Time code from")
    cdTimeTo: Optional[str] = Field(None, description="Time code to")
    lvArea: Optional[str] = Field(None, description="Area level")
    cdArea: Optional[str] = Field(None, description="Area code")
    cdAreaFrom: Optional[str] = Field(None, description="Area code from")
    cdAreaTo: Optional[str] = Field(None, description="Area code to")
    # ... see https://api.e-stat.go.jp/swagger-ui/e-statapi3.0.html#/

    # ページネーションパラメータ
    limit: int = Field(
        100000, description="Maximum number of records to fetch per request"
    )
    maximum_offset: Optional[int] = Field(
        None, description="Maximum number of records to fetch"
    )

    # 分類事項パラメータ(cat01-cat15)
    # 動的に生成されるため、Extraで受け入れる
    model_config = ConfigDict(extra="allow")

    @field_validator("statsDataId")
    @classmethod
    def validate_stats_data_id(cls, v: Union[str, List[str]]) -> Union[str, List[str]]:
        """Ensure statsDataId is valid."""
        if isinstance(v, list):
            if not v:
                raise ValueError("statsDataId list cannot be empty")
            for id_ in v:
                if not isinstance(id_, str) or not id_.strip():
                    raise ValueError(f"Invalid statsDataId: {id_}")
        elif isinstance(v, str):
            if not v.strip():
                raise ValueError("statsDataId cannot be empty")
        return v

validate_stats_data_id(v) classmethod

Ensure statsDataId is valid.

Source code in src/estat_api_dlt_helper/config/models.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@field_validator("statsDataId")
@classmethod
def validate_stats_data_id(cls, v: Union[str, List[str]]) -> Union[str, List[str]]:
    """Ensure statsDataId is valid."""
    if isinstance(v, list):
        if not v:
            raise ValueError("statsDataId list cannot be empty")
        for id_ in v:
            if not isinstance(id_, str) or not id_.strip():
                raise ValueError(f"Invalid statsDataId: {id_}")
    elif isinstance(v, str):
        if not v.strip():
            raise ValueError("statsDataId cannot be empty")
    return v

DestinationConfig

dlt destination (データロード先) の設定クラスです。ロード先のDWH、データセット/テーブル名、書き込み戦略を含むdltの設定を定義します。

Bases: BaseModel

Configuration for DLT data destination.

Defines parameters for loading data to various destinations using DLT, including destination type, dataset/table names, and write strategies.

Attributes:

Name Type Description
destination Union[str, Any]

DLT destination type or configuration object.

dataset_name str

Target dataset/schema name.

table_name str

Target table name.

write_disposition Literal['append', 'replace', 'merge']

How to write data (append/replace/merge).

primary_key Optional[Union[str, List[str]]]

Primary key columns for merge operations.

Source code in src/estat_api_dlt_helper/config/models.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class DestinationConfig(BaseModel):
    """Configuration for DLT data destination.

    Defines parameters for loading data to various destinations using DLT,
    including destination type, dataset/table names, and write strategies.

    Attributes:
        destination: DLT destination type or configuration object.
        dataset_name: Target dataset/schema name.
        table_name: Target table name.
        write_disposition: How to write data (append/replace/merge).
        primary_key: Primary key columns for merge operations.
    """

    destination: Union[str, Any] = Field(
        ...,
        description="DLT destination configuration | e.g. 'bigquery', 'duckdb', 'filesystem', 'motherduck'",
    )
    dataset_name: str = Field(..., description="Dataset/schema name in the destination")
    table_name: str = Field(..., description="Table name in the destination")
    write_disposition: Literal["append", "replace", "merge"] = Field(
        "merge", description="How to write data to the destination table"
    )
    primary_key: Optional[Union[str, List[str]]] = Field(
        ["time", "area", "cat01"],
        description="Primary key column(s) for merge operations",
    )

    # DLT pipeline configuration
    pipeline_name: Optional[str] = Field(None, description="Name of the DLT pipeline")
    dev_mode: bool = Field(False, description="Enable DLT development mode")

    # Additional destination-specific configuration
    credentials: Optional[Dict[str, Any]] = Field(
        None, description="Destination-specific credentials"
    )
    extra_options: Optional[Dict[str, Any]] = Field(
        None, description="Additional destination-specific options"
    )

    @field_validator("primary_key")
    @classmethod
    def validate_primary_key(
        cls, v: Optional[Union[str, List[str]]], info
    ) -> Optional[Union[str, List[str]]]:
        """Validate primary key is provided for merge operations."""
        write_disposition = info.data.get("write_disposition")
        if write_disposition == "merge" and not v:
            raise ValueError(
                "primary_key must be specified when write_disposition is 'merge'"
            )
        return v

validate_primary_key(v, info) classmethod

Validate primary key is provided for merge operations.

Source code in src/estat_api_dlt_helper/config/models.py
130
131
132
133
134
135
136
137
138
139
140
141
@field_validator("primary_key")
@classmethod
def validate_primary_key(
    cls, v: Optional[Union[str, List[str]]], info
) -> Optional[Union[str, List[str]]]:
    """Validate primary key is provided for merge operations."""
    write_disposition = info.data.get("write_disposition")
    if write_disposition == "merge" and not v:
        raise ValueError(
            "primary_key must be specified when write_disposition is 'merge'"
        )
    return v

APIクライアント

EstatApiClient

e-Stat APIアクセス用のクライアントクラスです。政府統計のe-Stat API機能から統計データを取得するメソッドを提供し、API認証、リクエストフォーマット、レスポンス解析を処理します。

Client for accessing e-Stat API.

Provides methods to fetch statistical data from Japan's e-Stat API. Handles API authentication, request formatting, and response parsing.

Attributes:

Name Type Description
app_id

e-Stat API application ID for authentication.

base_url

Base URL for API endpoints.

timeout

Request timeout in seconds.

session

HTTP session for connection pooling.

Source code in src/estat_api_dlt_helper/api/client.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class EstatApiClient:
    """Client for accessing e-Stat API.

    Provides methods to fetch statistical data from Japan's e-Stat API.
    Handles API authentication, request formatting, and response parsing.

    Attributes:
        app_id: e-Stat API application ID for authentication.
        base_url: Base URL for API endpoints.
        timeout: Request timeout in seconds.
        session: HTTP session for connection pooling.
    """

    def __init__(
        self,
        app_id: str,
        base_url: Optional[str] = None,
        timeout: int = 60,
    ):
        """Initialize e-Stat API client.

        Args:
            app_id: e-Stat API application ID
            base_url: Base URL for API (defaults to official endpoint)
            timeout: Request timeout in seconds
        """
        self.app_id = app_id
        self.base_url = base_url or ESTAT_ENDPOINTS["base_url"]
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update({"accept": "application/json"})

    def _make_request(
        self, endpoint: str, params: Dict[str, Any], **kwargs: Any
    ) -> requests.Response:
        """Make HTTP request to e-Stat API.

        Args:
            endpoint: API endpoint name
            params: Query parameters
            **kwargs: Additional arguments for requests

        Returns:
            Response object
        """
        url = f"{self.base_url}{endpoint}"

        # Add appId to params
        params = {"appId": self.app_id, **params}

        logger.debug(f"Making request to {url} with params: {params}")

        response = self.session.get(url, params=params, timeout=self.timeout, **kwargs)

        response.raise_for_status()
        return response

    def get_stats_data(
        self,
        stats_data_id: str,
        start_position: int = 1,
        limit: int = 100000,
        meta_get_flg: str = "Y",
        cnt_get_flg: str = "N",
        explanation_get_flg: str = "Y",
        annotation_get_flg: str = "Y",
        replace_sp_chars: str = "0",
        lang: str = "J",
        **additional_params: Any,
    ) -> Dict[str, Any]:
        """Get statistical data from e-Stat API.

        Args:
            stats_data_id: Statistical data ID
            start_position: Start position for data retrieval (1-based)
            limit: Maximum number of records to retrieve
            meta_get_flg: Whether to get metadata (Y/N)
            cnt_get_flg: Whether to get count only (Y/N)
            explanation_get_flg: Whether to get explanations (Y/N)
            annotation_get_flg: Whether to get annotations (Y/N)
            replace_sp_chars: Replace special characters (0: No, 1: Yes, 2: Remove)
            lang: Language (J: Japanese, E: English)
            **additional_params: Additional query parameters

        Returns:
            API response as dictionary
        """
        params = {
            "statsDataId": stats_data_id,
            "startPosition": start_position,
            "limit": limit,
            "metaGetFlg": meta_get_flg,
            "cntGetFlg": cnt_get_flg,
            "explanationGetFlg": explanation_get_flg,
            "annotationGetFlg": annotation_get_flg,
            "replaceSpChars": replace_sp_chars,
            "lang": lang,
            **additional_params,
        }

        response = self._make_request(ESTAT_ENDPOINTS["stats_data"], params)
        return response.json()

    def get_stats_data_generator(
        self, stats_data_id: str, limit_per_request: int = 100000, **kwargs: Any
    ) -> Generator[Dict[str, Any], None, None]:
        """Get statistical data as a generator for pagination.

        Args:
            stats_data_id: Statistical data ID
            limit_per_request: Number of records per request
            **kwargs: Additional parameters for get_stats_data

        Yields:
            Response data for each page
        """
        start_position = 1

        while True:
            response_data = self.get_stats_data(
                stats_data_id=stats_data_id,
                start_position=start_position,
                limit=limit_per_request,
                **kwargs,
            )

            # Extract data info
            stats_data = response_data.get("GET_STATS_DATA", {})
            statistical_data = stats_data.get("STATISTICAL_DATA", {})
            result_inf = statistical_data.get("RESULT_INF", {})

            # Get total number of records
            total_number = int(result_inf.get("TOTAL_NUMBER", 0))
            from_number = int(result_inf.get("FROM_NUMBER", 0))
            to_number = int(result_inf.get("TO_NUMBER", 0))

            logger.info(
                f"Retrieved records {from_number} to {to_number} of {total_number}"
            )

            yield response_data

            # Check if we've retrieved all records
            if to_number >= total_number:
                break

            # Update start position for next request
            start_position = to_number + 1

    def get_stats_list(
        self,
        search_word: Optional[str] = None,
        survey_years: Optional[str] = None,
        stats_code: Optional[str] = None,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """Get list of available statistics.

        Args:
            search_word: Search keyword
            survey_years: Survey years (YYYY or YYYYMM-YYYYMM)
            stats_code: Statistics code
            **kwargs: Additional query parameters

        Returns:
            API response as dictionary
        """
        params = {}

        if search_word:
            params["searchWord"] = search_word
        if survey_years:
            params["surveyYears"] = survey_years
        if stats_code:
            params["statsCode"] = stats_code

        params.update(kwargs)

        response = self._make_request(ESTAT_ENDPOINTS["stats_list"], params)
        return response.json()

    def close(self) -> None:
        """Close the session."""
        self.session.close()

__init__(app_id, base_url=None, timeout=60)

Initialize e-Stat API client.

Parameters:

Name Type Description Default
app_id str

e-Stat API application ID

required
base_url Optional[str]

Base URL for API (defaults to official endpoint)

None
timeout int

Request timeout in seconds

60
Source code in src/estat_api_dlt_helper/api/client.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self,
    app_id: str,
    base_url: Optional[str] = None,
    timeout: int = 60,
):
    """Initialize e-Stat API client.

    Args:
        app_id: e-Stat API application ID
        base_url: Base URL for API (defaults to official endpoint)
        timeout: Request timeout in seconds
    """
    self.app_id = app_id
    self.base_url = base_url or ESTAT_ENDPOINTS["base_url"]
    self.timeout = timeout
    self.session = requests.Session()
    self.session.headers.update({"accept": "application/json"})

close()

Close the session.

Source code in src/estat_api_dlt_helper/api/client.py
192
193
194
def close(self) -> None:
    """Close the session."""
    self.session.close()

get_stats_data(stats_data_id, start_position=1, limit=100000, meta_get_flg='Y', cnt_get_flg='N', explanation_get_flg='Y', annotation_get_flg='Y', replace_sp_chars='0', lang='J', **additional_params)

Get statistical data from e-Stat API.

Parameters:

Name Type Description Default
stats_data_id str

Statistical data ID

required
start_position int

Start position for data retrieval (1-based)

1
limit int

Maximum number of records to retrieve

100000
meta_get_flg str

Whether to get metadata (Y/N)

'Y'
cnt_get_flg str

Whether to get count only (Y/N)

'N'
explanation_get_flg str

Whether to get explanations (Y/N)

'Y'
annotation_get_flg str

Whether to get annotations (Y/N)

'Y'
replace_sp_chars str

Replace special characters (0: No, 1: Yes, 2: Remove)

'0'
lang str

Language (J: Japanese, E: English)

'J'
**additional_params Any

Additional query parameters

{}

Returns:

Type Description
Dict[str, Any]

API response as dictionary

Source code in src/estat_api_dlt_helper/api/client.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_stats_data(
    self,
    stats_data_id: str,
    start_position: int = 1,
    limit: int = 100000,
    meta_get_flg: str = "Y",
    cnt_get_flg: str = "N",
    explanation_get_flg: str = "Y",
    annotation_get_flg: str = "Y",
    replace_sp_chars: str = "0",
    lang: str = "J",
    **additional_params: Any,
) -> Dict[str, Any]:
    """Get statistical data from e-Stat API.

    Args:
        stats_data_id: Statistical data ID
        start_position: Start position for data retrieval (1-based)
        limit: Maximum number of records to retrieve
        meta_get_flg: Whether to get metadata (Y/N)
        cnt_get_flg: Whether to get count only (Y/N)
        explanation_get_flg: Whether to get explanations (Y/N)
        annotation_get_flg: Whether to get annotations (Y/N)
        replace_sp_chars: Replace special characters (0: No, 1: Yes, 2: Remove)
        lang: Language (J: Japanese, E: English)
        **additional_params: Additional query parameters

    Returns:
        API response as dictionary
    """
    params = {
        "statsDataId": stats_data_id,
        "startPosition": start_position,
        "limit": limit,
        "metaGetFlg": meta_get_flg,
        "cntGetFlg": cnt_get_flg,
        "explanationGetFlg": explanation_get_flg,
        "annotationGetFlg": annotation_get_flg,
        "replaceSpChars": replace_sp_chars,
        "lang": lang,
        **additional_params,
    }

    response = self._make_request(ESTAT_ENDPOINTS["stats_data"], params)
    return response.json()

get_stats_data_generator(stats_data_id, limit_per_request=100000, **kwargs)

Get statistical data as a generator for pagination.

Parameters:

Name Type Description Default
stats_data_id str

Statistical data ID

required
limit_per_request int

Number of records per request

100000
**kwargs Any

Additional parameters for get_stats_data

{}

Yields:

Type Description
Dict[str, Any]

Response data for each page

Source code in src/estat_api_dlt_helper/api/client.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def get_stats_data_generator(
    self, stats_data_id: str, limit_per_request: int = 100000, **kwargs: Any
) -> Generator[Dict[str, Any], None, None]:
    """Get statistical data as a generator for pagination.

    Args:
        stats_data_id: Statistical data ID
        limit_per_request: Number of records per request
        **kwargs: Additional parameters for get_stats_data

    Yields:
        Response data for each page
    """
    start_position = 1

    while True:
        response_data = self.get_stats_data(
            stats_data_id=stats_data_id,
            start_position=start_position,
            limit=limit_per_request,
            **kwargs,
        )

        # Extract data info
        stats_data = response_data.get("GET_STATS_DATA", {})
        statistical_data = stats_data.get("STATISTICAL_DATA", {})
        result_inf = statistical_data.get("RESULT_INF", {})

        # Get total number of records
        total_number = int(result_inf.get("TOTAL_NUMBER", 0))
        from_number = int(result_inf.get("FROM_NUMBER", 0))
        to_number = int(result_inf.get("TO_NUMBER", 0))

        logger.info(
            f"Retrieved records {from_number} to {to_number} of {total_number}"
        )

        yield response_data

        # Check if we've retrieved all records
        if to_number >= total_number:
            break

        # Update start position for next request
        start_position = to_number + 1

get_stats_list(search_word=None, survey_years=None, stats_code=None, **kwargs)

Get list of available statistics.

Parameters:

Name Type Description Default
search_word Optional[str]

Search keyword

None
survey_years Optional[str]

Survey years (YYYY or YYYYMM-YYYYMM)

None
stats_code Optional[str]

Statistics code

None
**kwargs Any

Additional query parameters

{}

Returns:

Type Description
Dict[str, Any]

API response as dictionary

Source code in src/estat_api_dlt_helper/api/client.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def get_stats_list(
    self,
    search_word: Optional[str] = None,
    survey_years: Optional[str] = None,
    stats_code: Optional[str] = None,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Get list of available statistics.

    Args:
        search_word: Search keyword
        survey_years: Survey years (YYYY or YYYYMM-YYYYMM)
        stats_code: Statistics code
        **kwargs: Additional query parameters

    Returns:
        API response as dictionary
    """
    params = {}

    if search_word:
        params["searchWord"] = search_word
    if survey_years:
        params["surveyYears"] = survey_years
    if stats_code:
        params["statsCode"] = stats_code

    params.update(kwargs)

    response = self._make_request(ESTAT_ENDPOINTS["stats_list"], params)
    return response.json()

データ解析

parse_response

e-Stat APIレスポンスを解析してArrow形式に変換する関数です。JSONレスポンスを受け取り、データ値と関連メタデータを含む構造化されたArrowテーブルを返します。

Parse e-Stat API response data and convert to Arrow table.

This is the main entry point for parsing e-Stat API responses. Takes the JSON response and returns a structured Arrow table with data values and associated metadata.

Parameters:

Name Type Description Default
data Dict[str, Any]

The complete JSON response from e-Stat API

required

Returns:

Type Description
Table

pa.Table: Arrow table containing the parsed data with metadata

Raises:

Type Description
ValueError

If required data sections are missing

KeyError

If expected keys are not found in the response

Source code in src/estat_api_dlt_helper/parser/response_parser.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def parse_response(data: Dict[str, Any]) -> pa.Table:
    """
    Parse e-Stat API response data and convert to Arrow table.

    This is the main entry point for parsing e-Stat API responses.
    Takes the JSON response and returns a structured Arrow table with
    data values and associated metadata.

    Args:
        data: The complete JSON response from e-Stat API

    Returns:
        pa.Table: Arrow table containing the parsed data with metadata

    Raises:
        ValueError: If required data sections are missing
        KeyError: If expected keys are not found in the response
    """
    # Validate response structure
    if "GET_STATS_DATA" not in data:
        raise ValueError("Invalid response: missing GET_STATS_DATA section")

    stats_data = data["GET_STATS_DATA"]

    if "STATISTICAL_DATA" not in stats_data:
        raise ValueError("Invalid response: missing STATISTICAL_DATA section")

    statistical_data = stats_data["STATISTICAL_DATA"]

    # Check for required sections
    required_sections = ["TABLE_INF", "CLASS_INF", "DATA_INF"]
    missing_sections = [
        section for section in required_sections if section not in statistical_data
    ]

    if missing_sections:
        raise ValueError(
            f"Invalid response: missing required sections: {', '.join(missing_sections)}"
        )

    # Validate DATA_INF has VALUE
    if "VALUE" not in statistical_data["DATA_INF"]:
        raise ValueError("Invalid response: DATA_INF missing VALUE section")

    # Create processors
    metadata_processor = MetadataProcessor()
    arrow_converter = ArrowConverter(metadata_processor)

    # Convert to Arrow table
    return arrow_converter.convert_to_arrow(statistical_data)

データローダー関数

load_estat_data

e-Stat APIデータを指定されたデスティネーションにロードする便利な関数です。提供された設定でdltパイプラインを作成して実行します。

Load e-Stat API data to the specified destination using DLT.

This is a convenience function that creates and runs a DLT pipeline with the provided configuration.

Parameters:

Name Type Description Default
config EstatDltConfig

Configuration for e-Stat API source and DLT destination

required
credentials Optional[Dict[str, Any]]

Optional credentials to override destination credentials

None
**kwargs Any

Additional arguments passed to pipeline.run()

{}

Returns:

Type Description
Any

LoadInfo object containing information about the load operation

Example
from estat_api_dlt_helper import EstatDltConfig, load_estat_data

config = {
    "source": {
        "app_id": "YOUR_API_KEY",
        "statsDataId": "0000020211",
        "limit": 10
    },
    "destination": {
        "destination": "duckdb",
        "dataset_name": "demo",
        "table_name": "demo",
        "write_disposition": "merge",
        "primary_key": ["time", "area", "cat01"]
    }
}

config = EstatDltConfig(**config)
info = load_estat_data(config)
print(info)
Source code in src/estat_api_dlt_helper/loader/load_manager.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def load_estat_data(
    config: EstatDltConfig,
    *,
    credentials: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Any:  # dlt.common.pipeline.LoadInfo
    """
    Load e-Stat API data to the specified destination using DLT.

    This is a convenience function that creates and runs a DLT pipeline
    with the provided configuration.

    Args:
        config: Configuration for e-Stat API source and DLT destination
        credentials: Optional credentials to override destination credentials
        **kwargs: Additional arguments passed to pipeline.run()

    Returns:
        LoadInfo object containing information about the load operation

    Example:
        ```python
        from estat_api_dlt_helper import EstatDltConfig, load_estat_data

        config = {
            "source": {
                "app_id": "YOUR_API_KEY",
                "statsDataId": "0000020211",
                "limit": 10
            },
            "destination": {
                "destination": "duckdb",
                "dataset_name": "demo",
                "table_name": "demo",
                "write_disposition": "merge",
                "primary_key": ["time", "area", "cat01"]
            }
        }

        config = EstatDltConfig(**config)
        info = load_estat_data(config)
        print(info)
        ```
    """
    logger.info("Starting e-Stat data load process")

    try:
        # Override credentials if provided
        if credentials:
            config.destination.credentials = credentials

        # Create the resource
        logger.debug("Creating e-Stat resource")
        resource = create_estat_resource(config)

        # Create the pipeline
        logger.debug("Creating DLT pipeline")
        pipeline = create_estat_pipeline(config)

        # Run the pipeline
        logger.info(
            f"Running pipeline for stats_data_id: {config.source.statsDataId} "
            f"to {config.destination.destination}/{config.destination.dataset_name}/"
            f"{config.destination.table_name}"
        )

        info = pipeline.run(resource, **kwargs)

        # Log results
        logger.info(f"Load completed: {info}")

        return info

    except Exception as e:
        logger.error(f"Error during data load: {e}")
        raise

create_estat_resource

e-Stat APIデータ用のdltリソースを作成する関数です。設定に基づいてe-Stat APIからデータを取得するカスタマイズ可能なdltリソースを作成します。

Create a DLT resource for e-Stat API data.

This function creates a customizable DLT resource that fetches data from the e-Stat API based on the provided configuration.

Parameters:

Name Type Description Default
config EstatDltConfig

Configuration for e-Stat API source and destination

required
name Optional[str]

Resource name (defaults to table_name from config)

None
primary_key Optional[Any]

Primary key columns (overrides config if provided)

None
write_disposition Optional[str]

Write disposition (overrides config if provided)

None
columns Optional[Any]

Column definitions for the resource

None
table_format Optional[str]

Table format for certain destinations

None
file_format Optional[str]

File format for filesystem destinations

None
schema_contract Optional[Any]

Schema contract settings

None
table_name Optional[Callable[[Any], str]]

Callable to generate dynamic table names

None
max_table_nesting Optional[int]

Maximum nesting level for nested data

None
selected Optional[bool]

Whether this resource is selected for loading

None
merge_key Optional[Any]

Merge key for merge operations

None
parallelized Optional[bool]

Whether to parallelize this resource

None
**resource_kwargs Any

Additional keyword arguments for dlt.resource

{}

Returns:

Type Description
Any

dlt.Resource: Configured DLT resource for e-Stat API data

Example
from estat_api_dlt_helper import EstatDltConfig, create_estat_resource

config = EstatDltConfig(...)
resource = create_estat_resource(config)

# Customize the resource
resource = create_estat_resource(
    config,
    name="custom_stats",
    columns={"time": {"data_type": "timestamp"}},
    selected=True
)
Source code in src/estat_api_dlt_helper/loader/dlt_resource.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_estat_resource(
    config: EstatDltConfig,
    *,
    name: Optional[str] = None,
    primary_key: Optional[Any] = None,
    write_disposition: Optional[str] = None,
    columns: Optional[Any] = None,
    table_format: Optional[str] = None,
    file_format: Optional[str] = None,
    schema_contract: Optional[Any] = None,
    table_name: Optional[Callable[[Any], str]] = None,
    max_table_nesting: Optional[int] = None,
    selected: Optional[bool] = None,
    merge_key: Optional[Any] = None,
    parallelized: Optional[bool] = None,
    **resource_kwargs: Any,
) -> Any:  # dlt.Resource
    """
    Create a DLT resource for e-Stat API data.

    This function creates a customizable DLT resource that fetches data
    from the e-Stat API based on the provided configuration.

    Args:
        config: Configuration for e-Stat API source and destination
        name: Resource name (defaults to table_name from config)
        primary_key: Primary key columns (overrides config if provided)
        write_disposition: Write disposition (overrides config if provided)
        columns: Column definitions for the resource
        table_format: Table format for certain destinations
        file_format: File format for filesystem destinations
        schema_contract: Schema contract settings
        table_name: Callable to generate dynamic table names
        max_table_nesting: Maximum nesting level for nested data
        selected: Whether this resource is selected for loading
        merge_key: Merge key for merge operations
        parallelized: Whether to parallelize this resource
        **resource_kwargs: Additional keyword arguments for dlt.resource

    Returns:
        dlt.Resource: Configured DLT resource for e-Stat API data

    Example:
        ```python
        from estat_api_dlt_helper import EstatDltConfig, create_estat_resource

        config = EstatDltConfig(...)
        resource = create_estat_resource(config)

        # Customize the resource
        resource = create_estat_resource(
            config,
            name="custom_stats",
            columns={"time": {"data_type": "timestamp"}},
            selected=True
        )
        ```
    """
    # Prepare API parameters
    api_params = _create_api_params(config)

    # Get stats data IDs (ensure it's a list)
    stats_data_ids = config.source.statsDataId
    if isinstance(stats_data_ids, str):
        stats_data_ids = [stats_data_ids]

    # Prepare resource configuration
    resource_config: Dict[str, Any] = {
        "name": name or config.destination.table_name,
        "write_disposition": write_disposition or config.destination.write_disposition,
        # Allow schema evolution for handling different metadata structures
        "schema_contract": schema_contract
        or {
            "tables": "evolve",
            "columns": "evolve",  # Allow new columns like parent_code in time_metadata
            "data_type": "freeze",  # Keep data types consistent
        },
    }

    # Add primary key for merge disposition
    if primary_key is not None:
        resource_config["primary_key"] = primary_key
    elif (
        config.destination.write_disposition == "merge"
        and config.destination.primary_key
    ):
        pk = config.destination.primary_key
        if isinstance(pk, str):
            pk = [pk]
        resource_config["primary_key"] = pk

    # Add optional resource parameters
    optional_params = {
        "columns": columns,
        "table_format": table_format,
        "file_format": file_format,
        "schema_contract": schema_contract,
        "table_name": table_name,
        "max_table_nesting": max_table_nesting,
        "selected": selected,
        "merge_key": merge_key,
        "parallelized": parallelized,
    }

    for key, value in optional_params.items():
        if value is not None:
            resource_config[key] = value

    # Add any additional resource kwargs
    resource_config.update(resource_kwargs)

    @dlt.resource(**resource_config)  # type: ignore
    def estat_data() -> Generator[pa.Table, None, None]:
        """Generator function for e-Stat data."""
        client = EstatApiClient(app_id=config.source.app_id)

        try:
            # Process each stats data ID
            for stats_data_id in stats_data_ids:
                yield from _fetch_estat_data(
                    client=client,
                    stats_data_id=stats_data_id,
                    params=api_params,
                    limit=config.source.limit,
                    maximum_offset=config.source.maximum_offset,
                )
        finally:
            client.close()

    return estat_data()

create_estat_pipeline

e-Stat APIデータロード用のdltパイプラインを作成する関数です。提供された設定に基づいて指定されたデスティネーション用に構成されたカスタマイズ可能なdltパイプラインを作成します。

Create a DLT pipeline for e-Stat API data loading.

This function creates a customizable DLT pipeline configured for the specified destination based on the provided configuration.

Parameters:

Name Type Description Default
config EstatDltConfig

Configuration for e-Stat API source and destination

required
pipeline_name Optional[str]

Name of the pipeline (overrides config if provided)

None
pipelines_dir Optional[str]

Directory to store pipeline state

None
dataset_name Optional[str]

Dataset name in destination (overrides config if provided)

None
import_schema_path Optional[str]

Path to import schema from

None
export_schema_path Optional[str]

Path to export schema to

None
dev_mode Optional[bool]

Development mode (overrides config if provided)

None
refresh Optional[str]

Schema refresh mode

None
progress Optional[str]

Progress reporting configuration

None
destination Optional[Any]

DLT destination (constructed from config if not provided)

None
staging Optional[Any]

Staging destination for certain loaders

None
**pipeline_kwargs Any

Additional keyword arguments for dlt.pipeline

{}

Returns:

Type Description
Any

dlt.Pipeline: Configured DLT pipeline

Example
from estat_api_dlt_helper import EstatDltConfig, create_estat_pipeline

config = EstatDltConfig(...)
pipeline = create_estat_pipeline(config)

# Customize the pipeline
pipeline = create_estat_pipeline(
    config,
    pipeline_name="custom_estat_pipeline",
    dev_mode=True,
    progress="log"
)
Source code in src/estat_api_dlt_helper/loader/dlt_pipeline.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def create_estat_pipeline(
    config: EstatDltConfig,
    *,
    pipeline_name: Optional[str] = None,
    pipelines_dir: Optional[str] = None,
    dataset_name: Optional[str] = None,
    import_schema_path: Optional[str] = None,
    export_schema_path: Optional[str] = None,
    dev_mode: Optional[bool] = None,
    refresh: Optional[str] = None,
    progress: Optional[str] = None,
    destination: Optional[Any] = None,
    staging: Optional[Any] = None,
    **pipeline_kwargs: Any,
) -> Any:  # dlt.Pipeline
    """
    Create a DLT pipeline for e-Stat API data loading.

    This function creates a customizable DLT pipeline configured for
    the specified destination based on the provided configuration.

    Args:
        config: Configuration for e-Stat API source and destination
        pipeline_name: Name of the pipeline (overrides config if provided)
        pipelines_dir: Directory to store pipeline state
        dataset_name: Dataset name in destination (overrides config if provided)
        import_schema_path: Path to import schema from
        export_schema_path: Path to export schema to
        dev_mode: Development mode (overrides config if provided)
        refresh: Schema refresh mode
        progress: Progress reporting configuration
        destination: DLT destination (constructed from config if not provided)
        staging: Staging destination for certain loaders
        **pipeline_kwargs: Additional keyword arguments for dlt.pipeline

    Returns:
        dlt.Pipeline: Configured DLT pipeline

    Example:
        ```python
        from estat_api_dlt_helper import EstatDltConfig, create_estat_pipeline

        config = EstatDltConfig(...)
        pipeline = create_estat_pipeline(config)

        # Customize the pipeline
        pipeline = create_estat_pipeline(
            config,
            pipeline_name="custom_estat_pipeline",
            dev_mode=True,
            progress="log"
        )
        ```
    """
    # Determine pipeline name
    name = pipeline_name or config.destination.pipeline_name
    if not name:
        # Generate default pipeline name
        stats_id = config.source.statsDataId
        if isinstance(stats_id, list):
            stats_id = "_".join(stats_id[:3])  # Limit to first 3 IDs
            if len(config.source.statsDataId) > 3:
                stats_id += "_etc"
        else:
            stats_id = stats_id

        name = f"estat_{config.destination.dataset_name}_{stats_id}"

    # Prepare destination configuration
    dest = destination or config.destination.destination

    # Handle destination-specific configurations
    if isinstance(dest, str):
        # String destination name
        if config.destination.credentials:
            # Create destination with credentials
            # For now, just use the string destination name
            # DLT will handle the destination creation internally
            pass

    # Prepare pipeline configuration
    pipeline_config = {
        "pipeline_name": name,
        "destination": dest,
        "dataset_name": dataset_name or config.destination.dataset_name,
    }

    # Add optional parameters with proper defaults
    if dev_mode is not None:
        pipeline_config["dev_mode"] = dev_mode
    elif config.destination.dev_mode is not None:
        pipeline_config["dev_mode"] = config.destination.dev_mode

    # Add other optional parameters
    optional_params = {
        "pipelines_dir": pipelines_dir,
        "import_schema_path": import_schema_path,
        "export_schema_path": export_schema_path,
        "refresh": refresh,
        "progress": progress,
        "staging": staging,
    }

    for key, value in optional_params.items():
        if value is not None:
            pipeline_config[key] = value

    # Add any additional pipeline kwargs
    pipeline_config.update(pipeline_kwargs)

    # Create and return the pipeline
    logger.info(
        f"Creating pipeline '{name}' for destination '{dest}' "
        f"with dataset '{pipeline_config['dataset_name']}'"
    )

    return dlt.pipeline(**pipeline_config)