ENECHANGE Developer Blog

ENECHANGE開発者ブログ

AWS Glue + Iceberg で分析基盤を構築し、電力データを取り込んでみる

はじめに

データレイクのテーブルフォーマット Apache Iceberg というものがあると知りました。

Icebergは、大規模な分析テーブル向けの高性能フォーマットです。IcebergはSQLテーブルの信頼性と簡便性をビッグデータにもたらすと同時に、Spark、Trino、Flink、Presto、Hive、Impalaといったエンジンが、同じテーブルを同時に安全に操作できるようにします。

弊社プロダクトの中には大量の電力使用量データを扱うものもありParquetファイルを利用しているのですが、電力需要家ごとにParquetファイルを分割し、編集時にはPythonでParquet ファイルを直接更新しています。

Apache Iceberg を使うことによりこの辺りの管理を上手くできないかと考えました。

そこで今回は、「XMLフォーマットで提供される電力使用量データ」を題材に、AWS Glue と Iceberg を使った分析基盤を構築していきたいと思います。

やること

XMLフォーマットで提供される電力使用量データを、AWS Glue(PySpark)でパースしてIceberg テーブルに取り込み(実際のファイルは Parquet ファイル)、Athena でデータ分析できるようにする。

アーキテクチャ概要

データの流れは以下の通りです。

  1. XMLファイルが S3 (s3://consumption-iceberg/xml/consumption/) にアップロードされる
  2. EventBridge が検知し、Glue Workflow を起動
  3. Glue Workflow が Glue Job を実行
  4. Glue ジョブが処理を実行
    • XMLをネイティブに読み込み
    • ネストされた構造をフラットなテーブル形式(横持ち)に変換
    • Iceberg テーブルに MERGE INTO で UPSERT(挿入・更新)
  5. S3 上の Iceberg テーブル (s3://consumption-iceberg/iceberg/consumption/)
  6. Athena から標準SQLでクエリ

サンプルデータ仕様

電力使用量データフォーマットは、検証用に作成した簡易な扱いやすいものです。

  • 1つのXMLファイルに複数の需要家分の電力使用量データが含まれます。
    • 需要家ID (external_id): STRING
    • 対象日 (target_date): DATE
    • 更新日時 (updated_at): TIMESTAMP
    • 30分ごとの電力使用量: 48コマ (00:00, 00:30, ... 23:30)
  • ファイル配置: s3://consumption-iceberg/xml/consumption/YYYYMMDD/consumption.xml

サンプル XML 構造

<?xml version="1.0" encoding="UTF-8"?>
<electricity_usage_report>
  <record>
    <external_id>CUST-001234</external_id>
    <target_date>2024-01-15</target_date>
    <updated_at>2024-01-16T06:30:00+09:00</updated_at>
    <usage_kwh>
      <slot time="00:00">1.2</slot>
      <slot time="00:30">1.1</slot>
      <!-- ... 48コマ ... -->
      <slot time="23:30">1.3</slot>
    </usage_kwh>
  </record>
  <record>
    <external_id>CUST-001235</external_id>
    <target_date>2024-01-15</target_date>
    <updated_at>2024-01-16T06:30:00+09:00</updated_at>
    <usage_kwh>
      <slot time="00:00">1.2</slot>
      <slot time="00:30">1.1</slot>
      <!-- ... 48コマ ... -->
      <slot time="23:30">1.3</slot>
    </usage_kwh>
  </record>
</electricity_usage_report>

テーブル設計

AWS Console の Amazon Athena で実施していきます。

1. Athena Workgroup の作成

  • ワークグループ名 : consumption-iceberg-workgroup
  • 分析エンジン タイプ: Athena SQL

2. データベース 作成

クエリエディタで、ワークグループ名 : consumption-iceberg-workgroup を選択して、以下のCREATE 文を実行します。

CREATE DATABASE IF NOT EXISTS consumption_iceberg_db LOCATION 's3://consumption-iceberg/iceberg/'

3. 電力使用量テーブル

ワークグループ名 : consumption-iceberg-workgroup に consumption_iceberg_db データベースができたら、電力使用量テーブルを作成します。

CREATE TABLE consumption (
  external_id STRING,
  target_date DATE,
  kwh_0000 DECIMAL(10,2),
  kwh_0030 DECIMAL(10,2),
  kwh_0100 DECIMAL(10,2),
  kwh_0130 DECIMAL(10,2),
  kwh_0200 DECIMAL(10,2),
  kwh_0230 DECIMAL(10,2),
  kwh_0300 DECIMAL(10,2),
  kwh_0330 DECIMAL(10,2),
  kwh_0400 DECIMAL(10,2),
  kwh_0430 DECIMAL(10,2),
  kwh_0500 DECIMAL(10,2),
  kwh_0530 DECIMAL(10,2),
  kwh_0600 DECIMAL(10,2),
  kwh_0630 DECIMAL(10,2),
  kwh_0700 DECIMAL(10,2),
  kwh_0730 DECIMAL(10,2),
  kwh_0800 DECIMAL(10,2),
  kwh_0830 DECIMAL(10,2),
  kwh_0900 DECIMAL(10,2),
  kwh_0930 DECIMAL(10,2),
  kwh_1000 DECIMAL(10,2),
  kwh_1030 DECIMAL(10,2),
  kwh_1100 DECIMAL(10,2),
  kwh_1130 DECIMAL(10,2),
  kwh_1200 DECIMAL(10,2),
  kwh_1230 DECIMAL(10,2),
  kwh_1300 DECIMAL(10,2),
  kwh_1330 DECIMAL(10,2),
  kwh_1400 DECIMAL(10,2),
  kwh_1430 DECIMAL(10,2),
  kwh_1500 DECIMAL(10,2),
  kwh_1530 DECIMAL(10,2),
  kwh_1600 DECIMAL(10,2),
  kwh_1630 DECIMAL(10,2),
  kwh_1700 DECIMAL(10,2),
  kwh_1730 DECIMAL(10,2),
  kwh_1800 DECIMAL(10,2),
  kwh_1830 DECIMAL(10,2),
  kwh_1900 DECIMAL(10,2),
  kwh_1930 DECIMAL(10,2),
  kwh_2000 DECIMAL(10,2),
  kwh_2030 DECIMAL(10,2),
  kwh_2100 DECIMAL(10,2),
  kwh_2130 DECIMAL(10,2),
  kwh_2200 DECIMAL(10,2),
  kwh_2230 DECIMAL(10,2),
  kwh_2300 DECIMAL(10,2),
  kwh_2330 DECIMAL(10,2),
  updated_at TIMESTAMP,
  loaded_at TIMESTAMP
)
PARTITIONED BY (target_date)
LOCATION 's3://consumption-iceberg/iceberg/consumption/'
TBLPROPERTIES (
  'table_type' = 'ICEBERG',
  'format' = 'parquet',
  'write_compression' = 'snappy'
);

CREATE文には、スキーマ定義部分の他に以下を指定します。

テーブルプロパティ

プロパティ 説明 設定値
table_type テーブルタイプをIcebergとして指定 ICEBERG
format データファイルのフォーマットを指定 parquet
write_compression Parquetファイルの圧縮コーデックを指定 snappy

parquetファイルのパーティション

Parquet ファイルの物理的な配置方法を指定します。 今回は、target_date を対象とします。

設定値
PARTITIONED BY () target_date

parquetファイルの保存先

設定値
LOCATION s3://consumption-iceberg/iceberg/consumption/

ここまでで電力使用量を保存するするためのテーブル作成は完了しました。

次にXMLファイルを処理するための仕組みを構築していきましょう。

AWS Glue ジョブ設計

AWSコンソールでのジョブ作成手順

1. Glue Jobの作成

  1. AWS Management Console にログイン
  2. AWS Glue サービスを開く
  3. 左メニューから ETL JobsVisual ETLScript editor を選択
  4. Create job をクリック

2. ジョブの基本設定

Job details タブを選択し、設定を追加してきます。

設定項目 設定値
Name consumption-xml-to-iceberg
IAM Role consumption-iceberg-glue-role
Type Spark
Glue version Glue 5.0
Language Python 3
Script path s3://consumption-iceberg/scripts/consumption-xml-to-iceberg.py
Temporary path s3://consumption-iceberg/temporary/
Spark UI logs path s3://consumption-iceberg/sparkHistoryLogs/
Job bookmark Enable

Job Bookmark について: 有効にすると、処理済みファイルを記録し、次回実行時にスキップします。

3. ジョブパラメータ(Job parameters)

Key-value pairs に以下を追加:

Key Value 説明
--datalake-formats iceberg Icebergサポートを有効化

4. IAMロールの設定

IAMロール consumption-iceberg-glue-role には、以下の2つの設定が必要です。

(1) 信頼関係(Trust Relationship)

ロールを作成する際、「信頼されたエンティティタイプ」で AWS Service を選択し、ユースケースで Glue を選択します。

(2) 許可ポリシー(Permissions Policy)

ロール作成後、以下のポリシーを追加します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject"
      ],
      "Resource": [
        "arn:aws:s3:::consumption-iceberg/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::consumption-iceberg"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:GetDatabase",
        "glue:GetTable",
        "glue:CreateTable",
        "glue:UpdateTable",
        "glue:GetPartitions",
        "glue:BatchCreatePartition"
      ],
      "Resource": "*"
    }
  ]
}

処理フロー

import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ジョブ引数の取得
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Sparkセッションの初期化(Iceberg対応)
spark = SparkSession.builder \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://consumption-iceberg/iceberg/") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ---------------------------------------------------------
# 1. XMLデータの読み込み
# ---------------------------------------------------------
input_path = "s3://consumption-iceberg/xml/consumption/"

dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [input_path],
        "recurse": True
    },
    format="xml",
    format_options={
        "rowTag": "record"
    },
    transformation_ctx="datasource0"
)

if dyf.count() > 0:
    # DynamicFrame を Spark DataFrame に変換
    xml_df = dyf.toDF()

    # ---------------------------------------------------------
    # 2. データ変換(48コマを横持ちカラムに展開)
    # ---------------------------------------------------------
    select_exprs = [
        F.col("external_id"),
        F.col("target_date").cast("date"),
        F.col("updated_at").cast("timestamp"),
        F.current_timestamp().alias("loaded_at")
    ]

    # 48コマ(00:00〜23:30)を展開
    for h in range(24):
        for m in [0, 30]:
            time_str = f"{h:02d}:{m:02d}"
            col_name = f"kwh_{h:02d}{m:02d}"
            expr = F.expr(f"filter(usage_kwh.slot, x -> x._time = '{time_str}')[0]._VALUE")
            select_exprs.append(expr.cast("decimal(10,2)").alias(col_name))

    df_wide = xml_df.select(*select_exprs)

    # ---------------------------------------------------------
    # 3. Icebergテーブルへの書き込み(MERGE)
    # ---------------------------------------------------------
    target_table = "glue_catalog.consumption_iceberg_db.consumption"

    # 重複排除(同じキーで最新の updated_at のみ残す)
    windowSpec = Window.partitionBy("external_id", "target_date").orderBy(F.col("updated_at").desc())
    df_deduped = df_wide.withColumn("rank", F.row_number().over(windowSpec)) \
        .filter("rank == 1") \
        .drop("rank")

    df_deduped.createOrReplaceTempView("source_data")

    # MERGE(キーが一致すれば更新、なければ挿入)
    spark.sql(f"""
        MERGE INTO {target_table} t
        USING source_data s
        ON t.external_id = s.external_id AND t.target_date = s.target_date
        WHEN MATCHED THEN
            UPDATE SET *
        WHEN NOT MATCHED THEN
            INSERT *
    """)
else:
    print("--- No new files to process ---")

job.commit()

自動実行の設定(S3トリガー)

1. S3 EventBridge通知の有効化

  1. S3コンソールで consumption-iceberg バケットを開く
  2. Properties タブの Amazon EventBridge セクションで Edit をクリック
  3. On に設定して保存

2. Glue Workflow の作成

  1. Glue コンソールの WorkflowsCreate workflow
  2. Name: consumption-etl-workflow
  3. Add trigger:
    • Type: EventBridge event
    • Name: consumption-etl-workflow-start-trigger
  4. Add node:
    • Job: consumption-xml-to-iceberg

3. EventBridgeルールの作成

Glue Workflow を起動するためのルールを作成します。

1. Amazon EventBridge コンソールで RulesCreate rule

項目 設定値
Name s3-xml-upload-to-glue-workflow
Event bus default
Rule type Rule with an event pattern

2. Event pattern を以下のように設定:

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["consumption-iceberg"]
    },
    "object": {
      "key": [{
        "prefix": "xml/consumption/"
      }]
    }
  }
}

3. Target を設定:

項目 設定値
Target types AWS service
Select a target Glue Workflow
Workflow name consumption-etl-workflow

ここまでの作業でIcebergテーブルの作成とS3にアップロードしたXMLファイル処理の実装が完了しました。

次は動作の確認を行なっていきます。

4. 動作確認

1. XMLファイルをS3にアップロードすると

2. Glue Workflowsが起動し

3. Glue Jobが実行されます

5. S3 上の Iceberg テーブル構造を確認

data/(電力使用量 Parquet ファイル):

aws s3 ls s3://consumption-iceberg/iceberg/consumption/data/ --recursive | head -10
2025-12-14 08:36:23  38111 data/0000/0010/0000/00010000/target_date=2024-01-14/00000-93-xxxxx.parquet
2025-12-13 20:18:18  41429 data/0000/0110/1011/11000111/target_date=2024-01-04/00000-41-xxxxx.parquet
2025-12-13 22:50:57  37540 data/0000/0110/1101/00011001/target_date=2024-01-13/00000-92-xxxxx.parquet
2025-12-13 20:24:07  41618 data/0000/0111/0010/10011011/target_date=2024-01-05/00000-41-xxxxx.parquet
2025-12-13 20:24:07  41087 data/0000/0111/1111/00101111/target_date=2024-01-02/00000-41-xxxxx.parquet
...

metadata/(Iceberg メタデータ):

aws s3 ls s3://consumption-iceberg/iceberg/consumption/metadata/ | head -10
2025-12-13 14:39:19   6770 00000-xxxxx.metadata.json
2025-12-13 18:29:46   8138 00001-xxxxx.metadata.json
2025-12-13 19:11:01  14097 00002-xxxxx.metadata.json
2025-12-13 19:16:49  15146 00003-xxxxx.metadata.json
2025-12-13 19:18:46  16453 00004-xxxxx.metadata.json
...

備考: data/ 配下はハッシュプレフィックスで分散配置され、metadata/ には MERGE のたびにスナップショットが記録されます。

Athena クエリ例

分析クエリ

取り込んだデータを取得

SELECT * FROM "consumption_iceberg_db"."consumption" order by target_date asc, external_id asc;

需要家ごとの日別合計(48カラムから計算)

SELECT
  external_id,
  target_date,
  (kwh_0000 + kwh_0030 + kwh_0100 + kwh_0130 + kwh_0200 + kwh_0230 +
   kwh_0300 + kwh_0330 + kwh_0400 + kwh_0430 + kwh_0500 + kwh_0530 +
   kwh_0600 + kwh_0630 + kwh_0700 + kwh_0730 + kwh_0800 + kwh_0830 +
   kwh_0900 + kwh_0930 + kwh_1000 + kwh_1030 + kwh_1100 + kwh_1130 +
   kwh_1200 + kwh_1230 + kwh_1300 + kwh_1330 + kwh_1400 + kwh_1430 +
   kwh_1500 + kwh_1530 + kwh_1600 + kwh_1630 + kwh_1700 + kwh_1730 +
   kwh_1800 + kwh_1830 + kwh_1900 + kwh_1930 + kwh_2000 + kwh_2030 +
   kwh_2100 + kwh_2130 + kwh_2200 + kwh_2230 + kwh_2300 + kwh_2330) as total_kwh
FROM consumption_iceberg_db.consumption
ORDER BY target_date;

まとめ

XMLフォーマットで提供される電力使用量データを、AWS Glue(PySpark)でパースして、Iceberg テーブルに取り込み、Athena でデータ分析できるようになりました。

Icebergを導入することでParquet ファイルを直接編集することなくSQL文を使用してIceberg テーブルにUPSERTできる点、CREATE文にPARTITIONEDを指定することでParquet ファイルをパーティション毎に分割できるようになり便利だと感じました。