NTTドコモR&Dの技術ブログです。

【Python】つまずきながらawswranglerでAthenaにデータ送ってみた

はじめに

本記事はNTTドコモ R&D Advent Calendar 2022カレンダー2の20日目の記事になります。

株式会社ドコモ・インサイトマーケティングの鈴木(里)です。 モバイル空間統計®*1での位置情報データの分析に携わっています。

先日、社内のデータベースからAWS*2Athena*3にデータを送る必要がありました。

その中で様々なつまずきがあったため、ここに事象と対処をまとめたいと思います。

全体像

データの流れ

プログラミング言語はPythonを使用します。

やっていることは下記の3つです。

(1) データベースからデータをpandas.DataFrame*4形式で取得(今回は割愛)

(2) ローカルPCでDataFrameを適宜加工

(3) AWS SDK for pandas(awswrangler)*5を用いてAthenaにテーブル作成

動作環境

Windows10

Python3.10.4

aws-cli/2.7.31

AWS CLI環境設定時のつまずき

AWS CLIは以下のサイトなどからインストールすることができます。

docs.aws.amazon.com

複数のAWS環境につなぐ必要がある場合は環境ごとに「名前付きプロファイル」を作って使い分けをしているのですが

ここで最初のつまずきがありました。

「C:\Users\ユーザ名」のパスが悪さをする

■事象

Windowsの場合、AWS CLIに関する認証情報やコンフィグはおそらく「C:\Users\ユーザ名\.aws」フォルダ配下に配置されますが Windowsのユーザ名によっては困ったことになります。

例えばユーザ名が「田中 太郎」だった場合、上記フォルダは「C:\Users\田中 太郎\.aws」になりますが、 フォルダパスにスペースが含まれてしまいます。

するとPython等から名前付きプロファイルを読み込もうとした際、エラーになってしまうことがあります。

「C:\Users\ユーザ名」のフォルダパスはWindows側でユーザ名を変えてもそのままなので、 別のユーザを作成してそちらで作業する...ということができない環境の場合、困ったことになります(なりました)。

input

import awswrangler as wr
import boto3
session = boto3.Session(profile_name='my_profile_name') # 自分で作成した名前付きプロファイルに置き換える

# Athenaのdatabase一覧を取得
databases = wr.catalog.databases()

output

    raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.

→regionをコンフィグで指定していてもNoRegionError

output2

(os.environ['AWS_DEFAULT_REGION'] = 'ap-northeast-1'等でregionを指定した場合)

    raise NoCredentialsError()
botocore.exceptions.NoCredentialsError: Unable to locate credentials

→認証情報ファイルはあるのにUnable to locateエラー

■対処

事象発生時は気づかなかったのですが、冷静に考えると認証情報やコンフィグを別の場所に移せばこの事象を回避できそうです。

.awsフォルダごと別の場所に移すことでこれを回避していきます。ここではCドライブ直下に移してみます。

認証情報とコンフィグのパスは、下記の環境変数で指定することができます。

  • AWS認証情報のパス:AWS_SHARED_CREDENTIALS_FILE

  • AWSコンフィグのパス:AWS_CONFIG_FILE

スタートメニュー→envで検索→システム環境変数の編集→環境変数 から、下記の通り設定していきます。

  • 変数名:AWS_SHARED_CREDENTIALS_FILE、変数値:C:\.aws\credentials

  • 変数名:AWS_CONFIG_FILE、変数値:C:\.aws\config

環境変数設定画面

※環境変数を再読み込みさせるため、一度PCを再起動したほうがいいかもしれません

無事、名前付きプロファイルを読み込めるようになりました!

input

import awswrangler as wr
import boto3
session = boto3.Session(profile_name='my_profile_name') # 自分で作成した名前付きプロファイルに置き換える

# Athenaのdatabase一覧を取得
databases = wr.catalog.databases()
print(databases)

output

(データベース一覧のリスト)

※ちなみに私の環境だと、名前付きプロファイルとは別にdefaultのプロファイルを1つ定義しておかないとうまく動きませんでした(credentialsとconfig両方のファイルに書く必要がある)

DataFrame取得・加工処理でのつまずき

暗黙の型変換でつまずきました。

int型のカラムがいつの間にかfloat型に!?

■事象

int型のはずのカラム*8になぜかfloat型のデータが入っていました。 (テストデータでは事象発生しなかったため発見が遅れ、ちょっと焦りました)

調べた結果、初歩的な部分ですが暗黙の型変換が原因だとわかりました。 pandasには「整数の列でも中身に1つでもNULLのデータが含まれる場合、当該カラムをfloat型にする」仕様があり、これに引っかかっていました。

pandas.pydata.org

Because NaN is a float, a column of integers with even one missing values is cast to floating-point dtype

どうやらデータベース上で当該カラムにわずかにNULLのデータが入っており、DataFrameとして読み込んだ際にfloatになってしまったようです。

input

(データベースからデータを読み込む部分は割愛、DataFrameで代用)

import pandas as pd
import numpy as np
df = pd.DataFrame({'num':[1,np.nan,3],#カラム「num」はintだがNULLが含まれている
              'id':['006','005','106'],
              'value':[7.0,8.0,np.nan],
              'attr':[10,20,30]}
              )
print(df)

output

   num   id  value  attr
0  1.0  006    7.0    10
1  NaN  005    8.0    20
2  3.0  106    NaN    30

→intのはずのnum列がfloatになっている

■対処

いくつか方法がありそうですが、ここでは「NULLを穴埋めしてから対象カラムをint型に指定しなおす」ことで事なきを得ました。

input

import pandas as pd
import numpy as np
df = pd.DataFrame({'num':[1,np.nan,3],
              'id':['006','005','106'],
              'value':[7.0,8.0,np.nan],
              'attr':[10,20,30]}
              )
              
#欠損値を0で穴埋め:この時点ではまだnum列はfloat
df['num'] = df['num'].fillna(0) 

#num列をintにキャスト
df['num'] = df['num'].astype(int)

print(df)

output

   num   id  value  attr
0    1  006    7.0    10
1    0  005    8.0    20
2    3  106    NaN    30

→num列は整数のまま!

Athena(およびawswrangler)作業時のつまずき

pandasを使う際に各種AWSサービスの操作を助けてくれるライブラリ「AWS SDK for pandas(awswrangler)」を使うことでかなり楽ができたのですが、いくつかつまずきがありました。

S3に出力されるparquetファイル名を指定できない

■事象

awswrangler.s3.to_parquet関数を用いると、DataFrameから簡単にAthenaにテーブルを作成することができます。 しかし、データの実体であるS3バケット*9上のファイル*10以下のようにランダムな名前になってしまいます。

input

import awswrangler as wr
import boto3
wr.s3.to_parquet(df=df,  # Athenaに出力したいDataFrame
        ...略...
    )

output (S3バケット上に生成されたファイル)

99c43cb726fa42f68a1c7b6b899f2f52.snappy.parquet

やむを得ない措置だそうですが、 このままだとファイルが何を示すのかわかりづらいですね。

■対処

ファイル名を完全に指定することは現状できないようですが、 to_parquet関数のfilename_prefixに任意の文字列を渡すことでファイル名の先頭の文字列を指定することができます

例えば、filename_prefix = 'test_20221201_'と指定した場合のファイル名は下記のようになります。

input

import awswrangler as wr
import boto3
filename_prefix = 'test_20221201_'
wr.s3.to_parquet(df=df, # Athenaに出力したいDataFrame
    ...略...
    filename_prefix=filename_prefix, # ファイル名の頭につく文言
    ...略...   
    ) 

output (S3バケット上に生成されたファイル)

test_20221201_99c43cb726fa42f68a1c7b6b899f2f52.snappy.parquet

データの種類や追加した日などをfilename_prefixに指定することで、ファイルを区別しやすくすることができます。

間違ったファイルを追加してしまった

■事象

ダミーデータでのテスト中に、うっかり同じデータを2回テーブルに追加してしまいました...

Athenaテーブル

■対処

Athenaにあるテーブルの実体は、S3バケットに置かれたデータです。

to_parquet()でAthenaに追加されたデータは、下記のように「.parquet」という形式に圧縮されたファイルとして、S3バケットの指定した場所に配置されています。

S3バケットに置かれたparquetファイル

したがって、名前や更新日時から「誤って追加されたデータ」を特定できれば、当該データを削除することでテーブルを元の状態に戻すことができます

(上記の例だとfilename_prefixが全て'test_20221201_'なので、最終更新日時で区別しました)

重複している部分を削除する

データ削除後にAthena上で対象のテーブルを見てみると、誤って追加された部分が消えていることを確認できます。

Athenaテーブル

おわりに

新しくコードを書いたり環境を作ったりするときは大抵つまずきがあるものですが、強く生きていきたいですね。

よいお年を。

おまけ

Athena接続部テストコード

import pandas as pd
import awswrangler as wr
import boto3
from botocore.config import Config

# Athena関連設定クラス
class AWSSetting() :
    profile_name = '作成したプロファイル名' # AWS CLIプロファイルを指定
    bucket_name = 'my_bucket' # parquetファイル出力先のS3バケット
    bucket_path = f's3://{bucket_name}/test/' # parquetファイルを置くS3バケットのパス
    parquet_file_prefix = 'test_20221201_' # parquetファイルの先頭に付与される文字列
    athena_database_name = 'test_db' # Athenaの出力先データベース名(≒スキーマ)

# Athena関連処理のクラス
class AthenaClient(AWSSetting) :
    # boto3セッション作成およびdatabase準備
    def __init__(self):
        # AWS CLIで設定したプロファイルを指定してsessionを取得
        self.session = boto3.Session(profile_name=self.profile_name)
        
        # Athenaのdatabase一覧を取得
        databases = wr.catalog.databases()
        
        # 所望のdatabaseが未作成なら作成する
        if self.athena_database_name not in databases.values:
            wr.catalog.create_database(self.athena_database_name)

    # DataFrameをAWSSettingで指定の宛先に出力
    def dataframe_to_athena(self, df, table_name) -> None:
        to_parquet_mode = ''
        # 出力先テーブルが存在するならTrue、存在しないならFalse
        table_exist = wr.catalog.does_table_exist(database=self.athena_database_name, table=table_name, boto3_session=self.session)
        if(table_exist) :
            to_parquet_mode = 'append' # テーブルが存在するなら追記モード(append)にする
        else : 
            to_parquet_mode = 'overwrite' # そうでないなら新規作成モード(overwrite)にする

        # S3にparquet形式でデータを置きつつAthenaにテーブル作成
        wr.s3.to_parquet(
            df=df, # Athenaに出力したいDataFrame
            path=self.bucket_path, # 出力されたparquetファイルを置くS3バケットのパス
            dataset=True,
            filename_prefix=self.parquet_file_prefix, # ファイル名の頭につける文言
            mode=to_parquet_mode, # 新規作成(overwrite) or 追記(append)
            database=self.athena_database_name, # 出力先のAthenaのdatabase名(≒スキーマ)
            table=table_name, # Athenaに作成されるテーブル名
            # partition_cols=['xxx'] , # パーティションを切る場合に指定。パーティションに使うカラムを指定(xxxにカラム名が入る)
            boto3_session=self.session
        )

# テスト用のDataFrameを作成する
data = [[1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]]
df = pd.DataFrame(data, columns=['a', 'b', 'c'])

athena_client = AthenaClient()
athena_table_name = 'my_table'
athena_client.dataframe_to_athena(df, athena_table_name)

参考にさせていただいたサイト

Amazon Athena とは

Amazon S3 とは

DataFrame(データフレーム)

AWS CLIでクレデンシャルファイルパスを環境変数から設定できるようになりました

Windows環境変数編集画面へのショートカットアクセス方法

(pandas user guide) Integer dtypes and missing data

(AWS SDK for pandas(awswrangler)) 4 - Parquet Datasets

(AWS SDK for pandas(awswrangler) Github issue) parquet dataset file name

*1:「モバイル空間統計」は株式会社NTTドコモの登録商標です

*2:Amazon Web Service:世界最大規模のクラウドプラットフォーム

*3:AWS Athena:アドホックな分析に適したデータベース

*4:pandas.DataFrame:Pythonでのデータ分析や機械学習でよく使われるデータ形式

*5:AWS SDK for pandas (awswrangler):pandasからAWSを便利に操作できるPythonのライブラリ。参考リンク先ではAWS Data Wranglerと呼ばれているが、これは古い名称。最近正式名称が変わった模様

*6:AWS CLI:コマンドライン上から様々なAWSのサービスを操作することができるツール。ここではPythonからAWSに対して各種操作をするために使用

*7:名前付きプロファイル:AWS CLIの認証情報とコンフィグのペアに名前を付けたもの。ユーザ側で名前を指定すれば所望の設定を呼び出せる

*8:カラム:列のこと

*9:S3:AWSのストレージサービス。ストレージは「バケット」という単位で作成/管理する

*10:AthenaはS3に対してクエリをかけるサービスなので、データの実体はS3バケットに配置される