はじめに
データレイクのテーブルフォーマット Apache Iceberg というものがあると知りました。
Icebergは、大規模な分析テーブル向けの高性能フォーマットです。IcebergはSQLテーブルの信頼性と簡便性をビッグデータにもたらすと同時に、Spark、Trino、Flink、Presto、Hive、Impalaといったエンジンが、同じテーブルを同時に安全に操作できるようにします。
弊社プロダクトの中には大量の電力使用量データを扱うものもありParquetファイルを利用しているのですが、電力需要家ごとにParquetファイルを分割し、編集時にはPythonでParquet ファイルを直接更新しています。
Apache Iceberg を使うことによりこの辺りの管理を上手くできないかと考えました。
そこで今回は、「XMLフォーマットで提供される電力使用量データ」を題材に、AWS Glue と Iceberg を使った分析基盤を構築していきたいと思います。
やること
XMLフォーマットで提供される電力使用量データを、AWS Glue(PySpark)でパースしてIceberg テーブルに取り込み(実際のファイルは Parquet ファイル)、Athena でデータ分析できるようにする。
アーキテクチャ概要
データの流れは以下の通りです。
- XMLファイルが S3 (
s3://consumption-iceberg/xml/consumption/) にアップロードされる - EventBridge が検知し、Glue Workflow を起動
- Glue Workflow が Glue Job を実行
- Glue ジョブが処理を実行
- XMLをネイティブに読み込み
- ネストされた構造をフラットなテーブル形式(横持ち)に変換
- Iceberg テーブルに
MERGE INTOで UPSERT(挿入・更新)
- S3 上の Iceberg テーブル (
s3://consumption-iceberg/iceberg/consumption/) - Athena から標準SQLでクエリ
サンプルデータ仕様
電力使用量データフォーマットは、検証用に作成した簡易な扱いやすいものです。
- 1つのXMLファイルに複数の需要家分の電力使用量データが含まれます。
- 需要家ID (external_id): STRING
- 対象日 (target_date): DATE
- 更新日時 (updated_at): TIMESTAMP
- 30分ごとの電力使用量: 48コマ (00:00, 00:30, ... 23:30)
- ファイル配置:
s3://consumption-iceberg/xml/consumption/YYYYMMDD/consumption.xml
サンプル XML 構造
<?xml version="1.0" encoding="UTF-8"?> <electricity_usage_report> <record> <external_id>CUST-001234</external_id> <target_date>2024-01-15</target_date> <updated_at>2024-01-16T06:30:00+09:00</updated_at> <usage_kwh> <slot time="00:00">1.2</slot> <slot time="00:30">1.1</slot> <!-- ... 48コマ ... --> <slot time="23:30">1.3</slot> </usage_kwh> </record> <record> <external_id>CUST-001235</external_id> <target_date>2024-01-15</target_date> <updated_at>2024-01-16T06:30:00+09:00</updated_at> <usage_kwh> <slot time="00:00">1.2</slot> <slot time="00:30">1.1</slot> <!-- ... 48コマ ... --> <slot time="23:30">1.3</slot> </usage_kwh> </record> </electricity_usage_report>
テーブル設計
AWS Console の Amazon Athena で実施していきます。
1. Athena Workgroup の作成
- ワークグループ名 : consumption-iceberg-workgroup
- 分析エンジン タイプ: Athena SQL
2. データベース 作成
クエリエディタで、ワークグループ名 : consumption-iceberg-workgroup を選択して、以下のCREATE 文を実行します。
CREATE DATABASE IF NOT EXISTS consumption_iceberg_db LOCATION 's3://consumption-iceberg/iceberg/'
3. 電力使用量テーブル
ワークグループ名 : consumption-iceberg-workgroup に consumption_iceberg_db データベースができたら、電力使用量テーブルを作成します。
CREATE TABLE consumption ( external_id STRING, target_date DATE, kwh_0000 DECIMAL(10,2), kwh_0030 DECIMAL(10,2), kwh_0100 DECIMAL(10,2), kwh_0130 DECIMAL(10,2), kwh_0200 DECIMAL(10,2), kwh_0230 DECIMAL(10,2), kwh_0300 DECIMAL(10,2), kwh_0330 DECIMAL(10,2), kwh_0400 DECIMAL(10,2), kwh_0430 DECIMAL(10,2), kwh_0500 DECIMAL(10,2), kwh_0530 DECIMAL(10,2), kwh_0600 DECIMAL(10,2), kwh_0630 DECIMAL(10,2), kwh_0700 DECIMAL(10,2), kwh_0730 DECIMAL(10,2), kwh_0800 DECIMAL(10,2), kwh_0830 DECIMAL(10,2), kwh_0900 DECIMAL(10,2), kwh_0930 DECIMAL(10,2), kwh_1000 DECIMAL(10,2), kwh_1030 DECIMAL(10,2), kwh_1100 DECIMAL(10,2), kwh_1130 DECIMAL(10,2), kwh_1200 DECIMAL(10,2), kwh_1230 DECIMAL(10,2), kwh_1300 DECIMAL(10,2), kwh_1330 DECIMAL(10,2), kwh_1400 DECIMAL(10,2), kwh_1430 DECIMAL(10,2), kwh_1500 DECIMAL(10,2), kwh_1530 DECIMAL(10,2), kwh_1600 DECIMAL(10,2), kwh_1630 DECIMAL(10,2), kwh_1700 DECIMAL(10,2), kwh_1730 DECIMAL(10,2), kwh_1800 DECIMAL(10,2), kwh_1830 DECIMAL(10,2), kwh_1900 DECIMAL(10,2), kwh_1930 DECIMAL(10,2), kwh_2000 DECIMAL(10,2), kwh_2030 DECIMAL(10,2), kwh_2100 DECIMAL(10,2), kwh_2130 DECIMAL(10,2), kwh_2200 DECIMAL(10,2), kwh_2230 DECIMAL(10,2), kwh_2300 DECIMAL(10,2), kwh_2330 DECIMAL(10,2), updated_at TIMESTAMP, loaded_at TIMESTAMP ) PARTITIONED BY (target_date) LOCATION 's3://consumption-iceberg/iceberg/consumption/' TBLPROPERTIES ( 'table_type' = 'ICEBERG', 'format' = 'parquet', 'write_compression' = 'snappy' );
CREATE文には、スキーマ定義部分の他に以下を指定します。
テーブルプロパティ
| プロパティ | 説明 | 設定値 |
|---|---|---|
table_type |
テーブルタイプをIcebergとして指定 | ICEBERG |
format |
データファイルのフォーマットを指定 | parquet |
write_compression |
Parquetファイルの圧縮コーデックを指定 | snappy |
parquetファイルのパーティション
Parquet ファイルの物理的な配置方法を指定します。 今回は、target_date を対象とします。
| 設定値 | |
|---|---|
PARTITIONED BY () |
target_date |
parquetファイルの保存先
| 設定値 | |
|---|---|
LOCATION |
s3://consumption-iceberg/iceberg/consumption/ |
ここまでで電力使用量を保存するするためのテーブル作成は完了しました。
次にXMLファイルを処理するための仕組みを構築していきましょう。
AWS Glue ジョブ設計
AWSコンソールでのジョブ作成手順
1. Glue Jobの作成
- AWS Management Console にログイン
- AWS Glue サービスを開く
- 左メニューから ETL Jobs → Visual ETL → Script editor を選択
- Create job をクリック
2. ジョブの基本設定
Job details タブを選択し、設定を追加してきます。
| 設定項目 | 設定値 |
|---|---|
| Name | consumption-xml-to-iceberg |
| IAM Role | consumption-iceberg-glue-role |
| Type | Spark |
| Glue version | Glue 5.0 |
| Language | Python 3 |
| Script path | s3://consumption-iceberg/scripts/consumption-xml-to-iceberg.py |
| Temporary path | s3://consumption-iceberg/temporary/ |
| Spark UI logs path | s3://consumption-iceberg/sparkHistoryLogs/ |
| Job bookmark | Enable |
Job Bookmark について: 有効にすると、処理済みファイルを記録し、次回実行時にスキップします。
3. ジョブパラメータ(Job parameters)
Key-value pairs に以下を追加:
| Key | Value | 説明 |
|---|---|---|
--datalake-formats |
iceberg |
Icebergサポートを有効化 |
4. IAMロールの設定
IAMロール consumption-iceberg-glue-role には、以下の2つの設定が必要です。
(1) 信頼関係(Trust Relationship)
ロールを作成する際、「信頼されたエンティティタイプ」で AWS Service を選択し、ユースケースで Glue を選択します。
(2) 許可ポリシー(Permissions Policy)
ロール作成後、以下のポリシーを追加します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:::consumption-iceberg/*" ] }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::consumption-iceberg" ] }, { "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:CreateTable", "glue:UpdateTable", "glue:GetPartitions", "glue:BatchCreatePartition" ], "Resource": "*" } ] }
処理フロー
import sys from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.window import Window # ジョブ引数の取得 args = getResolvedOptions(sys.argv, ['JOB_NAME']) # Sparkセッションの初期化(Iceberg対応) spark = SparkSession.builder \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.glue_catalog.warehouse", "s3://consumption-iceberg/iceberg/") \ .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ .getOrCreate() glueContext = GlueContext(spark.sparkContext) job = Job(glueContext) job.init(args['JOB_NAME'], args) # --------------------------------------------------------- # 1. XMLデータの読み込み # --------------------------------------------------------- input_path = "s3://consumption-iceberg/xml/consumption/" dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "paths": [input_path], "recurse": True }, format="xml", format_options={ "rowTag": "record" }, transformation_ctx="datasource0" ) if dyf.count() > 0: # DynamicFrame を Spark DataFrame に変換 xml_df = dyf.toDF() # --------------------------------------------------------- # 2. データ変換(48コマを横持ちカラムに展開) # --------------------------------------------------------- select_exprs = [ F.col("external_id"), F.col("target_date").cast("date"), F.col("updated_at").cast("timestamp"), F.current_timestamp().alias("loaded_at") ] # 48コマ(00:00〜23:30)を展開 for h in range(24): for m in [0, 30]: time_str = f"{h:02d}:{m:02d}" col_name = f"kwh_{h:02d}{m:02d}" expr = F.expr(f"filter(usage_kwh.slot, x -> x._time = '{time_str}')[0]._VALUE") select_exprs.append(expr.cast("decimal(10,2)").alias(col_name)) df_wide = xml_df.select(*select_exprs) # --------------------------------------------------------- # 3. Icebergテーブルへの書き込み(MERGE) # --------------------------------------------------------- target_table = "glue_catalog.consumption_iceberg_db.consumption" # 重複排除(同じキーで最新の updated_at のみ残す) windowSpec = Window.partitionBy("external_id", "target_date").orderBy(F.col("updated_at").desc()) df_deduped = df_wide.withColumn("rank", F.row_number().over(windowSpec)) \ .filter("rank == 1") \ .drop("rank") df_deduped.createOrReplaceTempView("source_data") # MERGE(キーが一致すれば更新、なければ挿入) spark.sql(f""" MERGE INTO {target_table} t USING source_data s ON t.external_id = s.external_id AND t.target_date = s.target_date WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """) else: print("--- No new files to process ---") job.commit()
自動実行の設定(S3トリガー)
1. S3 EventBridge通知の有効化
- S3コンソールで
consumption-icebergバケットを開く - Properties タブの Amazon EventBridge セクションで Edit をクリック
- On に設定して保存
2. Glue Workflow の作成
- Glue コンソールの Workflows → Create workflow
- Name:
consumption-etl-workflow - Add trigger:
- Type:
EventBridge event - Name:
consumption-etl-workflow-start-trigger
- Type:
- Add node:
- Job:
consumption-xml-to-iceberg
- Job:
3. EventBridgeルールの作成
Glue Workflow を起動するためのルールを作成します。
1. Amazon EventBridge コンソールで Rules → Create rule
| 項目 | 設定値 |
|---|---|
| Name | s3-xml-upload-to-glue-workflow |
| Event bus | default |
| Rule type | Rule with an event pattern |
2. Event pattern を以下のように設定:
{ "source": ["aws.s3"], "detail-type": ["Object Created"], "detail": { "bucket": { "name": ["consumption-iceberg"] }, "object": { "key": [{ "prefix": "xml/consumption/" }] } } }
3. Target を設定:
| 項目 | 設定値 |
|---|---|
| Target types | AWS service |
| Select a target | Glue Workflow |
| Workflow name | consumption-etl-workflow |
ここまでの作業でIcebergテーブルの作成とS3にアップロードしたXMLファイル処理の実装が完了しました。
次は動作の確認を行なっていきます。
4. 動作確認
1. XMLファイルをS3にアップロードすると

2. Glue Workflowsが起動し

3. Glue Jobが実行されます

5. S3 上の Iceberg テーブル構造を確認
data/(電力使用量 Parquet ファイル):
aws s3 ls s3://consumption-iceberg/iceberg/consumption/data/ --recursive | head -10
2025-12-14 08:36:23 38111 data/0000/0010/0000/00010000/target_date=2024-01-14/00000-93-xxxxx.parquet 2025-12-13 20:18:18 41429 data/0000/0110/1011/11000111/target_date=2024-01-04/00000-41-xxxxx.parquet 2025-12-13 22:50:57 37540 data/0000/0110/1101/00011001/target_date=2024-01-13/00000-92-xxxxx.parquet 2025-12-13 20:24:07 41618 data/0000/0111/0010/10011011/target_date=2024-01-05/00000-41-xxxxx.parquet 2025-12-13 20:24:07 41087 data/0000/0111/1111/00101111/target_date=2024-01-02/00000-41-xxxxx.parquet ...
metadata/(Iceberg メタデータ):
aws s3 ls s3://consumption-iceberg/iceberg/consumption/metadata/ | head -10
2025-12-13 14:39:19 6770 00000-xxxxx.metadata.json 2025-12-13 18:29:46 8138 00001-xxxxx.metadata.json 2025-12-13 19:11:01 14097 00002-xxxxx.metadata.json 2025-12-13 19:16:49 15146 00003-xxxxx.metadata.json 2025-12-13 19:18:46 16453 00004-xxxxx.metadata.json ...
備考:
data/配下はハッシュプレフィックスで分散配置され、metadata/には MERGE のたびにスナップショットが記録されます。
Athena クエリ例
分析クエリ
取り込んだデータを取得
SELECT * FROM "consumption_iceberg_db"."consumption" order by target_date asc, external_id asc;

需要家ごとの日別合計(48カラムから計算)
SELECT external_id, target_date, (kwh_0000 + kwh_0030 + kwh_0100 + kwh_0130 + kwh_0200 + kwh_0230 + kwh_0300 + kwh_0330 + kwh_0400 + kwh_0430 + kwh_0500 + kwh_0530 + kwh_0600 + kwh_0630 + kwh_0700 + kwh_0730 + kwh_0800 + kwh_0830 + kwh_0900 + kwh_0930 + kwh_1000 + kwh_1030 + kwh_1100 + kwh_1130 + kwh_1200 + kwh_1230 + kwh_1300 + kwh_1330 + kwh_1400 + kwh_1430 + kwh_1500 + kwh_1530 + kwh_1600 + kwh_1630 + kwh_1700 + kwh_1730 + kwh_1800 + kwh_1830 + kwh_1900 + kwh_1930 + kwh_2000 + kwh_2030 + kwh_2100 + kwh_2130 + kwh_2200 + kwh_2230 + kwh_2300 + kwh_2330) as total_kwh FROM consumption_iceberg_db.consumption ORDER BY target_date;

まとめ
XMLフォーマットで提供される電力使用量データを、AWS Glue(PySpark)でパースして、Iceberg テーブルに取り込み、Athena でデータ分析できるようになりました。
Icebergを導入することでParquet ファイルを直接編集することなくSQL文を使用してIceberg テーブルにUPSERTできる点、CREATE文にPARTITIONEDを指定することでParquet ファイルをパーティション毎に分割できるようになり便利だと感じました。