GCSトリガーでのCSVインポートはじめに
Google Cloudでデータ取込のパイプラインを開発しました。
その際に、Cloud Storageにcsvが配置されたことをトリガーにBigQueryにデータ取込を実行する処理を作りました。
そちらのコードが調べても意外と出てこなかったのでサンプルコードとアーキテクチャをまとめてみました。
GCSトリガーでBigQueryにcsvインポートしてやりたかったこと
- Google Cloud環境にて、 csvファイル取り込み→データ加工→可視化→分析 を自動で出来るようにしたかった
- csvファイルの配置はデータ分析者(事業部側のメンバー)が行う想定だった
GCSトリガーの設定について補足
- Cloud Functionsで処理を実行する(今回は言語:python)
- Cloud Storageにcsvファイルを配置したことをトリガーにBigQueryにデータインポートする&取り込むファイルをファイル名で判定する
- 取り込んだ日本語が文字化けしないようcsvファイルはUTF-8を指定する
Cloud Functionsとは、サーバーレスにアプリが実行できるサービスです。
例えば、Cloud Functionsにコードを記述することでCloud StorageのデータをBigQueryに取り込むことやBigQueryのデータを加工することが可能です。ファイル配置や時間指定のトリガーが設定できます。MAX9分でタイムアウトするので、大規模な処理には向かないかもしれません。
BigQueryにcsv取込までのアーキテクチャ
Cloud Functionsに記述したコードサンプル
下記のコードをCloud Functionsのpythonファイルに記述しました
※青字部分はコメント
※先に取込ファイルとカラム名・データ型を合わせたテーブルをBigqueryで作成しておく
from google.cloud import bigquery
client = bigquery.Client()
def main(data, context):
# Cloud Storageに配置されたファイル情報の取得
bucket_name = data["bucket"]
file_name = data["name"]
uri = 'gs://{}/{}'.format(bucket_name, file_name)
# インポートしたいcsvファイル名に含まれる文字列を指定
# ⇔取り込むファイル名には毎回 csv_file で指定した文字列を含む必要がある
csv_file = "ファイル名に含まれる文字列"
# データ更新先のBigquery内データセット名・テーブル名を指定
# Cloud Storageに配置されたファイル名が違う場合は処理終了となる
dataset_id = "取込先のBQのデータセット名"
if csv_file in file_name:
table_id = "取込先のBQのテーブル名"
else:
print('end')
return
dataset_ref = client.dataset(dataset_id)
# スキーマ定義(列1~3が取込先のBQのカラム、必要に応じ増やす)
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("列1", "STRING"),
bigquery.SchemaField("列2", "STRING"),
bigquery.SchemaField("列3", "STRING")
]
)
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows=1 ←ヘッダーとばすの意味
job_config.write_disposition = 'WRITE_TRUNCATE' ←テーブルをrepalaceするの意味
# BQへのロードJOB(このままでOK)
load_job = client.load_table_from_uri(
uri, dataset_ref.table(table_id), job_config=job_config
)
load_job.result()
print('success load')
GCSトリガーでのCSVインポートまとめ
以上が、GCSトリガーでのBigqueryへのCSVインポートの解説となります。
GCSトリガーはファイルの配置時間が読めないときに有用なトリガーになります。
ケースに合わせ、ぜひご参考ください!
もしプログラム学習でお困りでしたら受講生に1人ずつ現役のプロのパーソナルメンターがつく
TechAcademy [テックアカデミー] のご活用をぜひご検討ください!
Google Cloud関連記事
- データ分析基盤のアーキテクチャ全体はこちらとなります
- CloudFunctionsでのBigQueryテーブルへのクエリ実行について、こちらにまとめておりますのでご参考ください。
- その他Google Cloud関連記事をこちらにまとめております。
参考になれば幸いです☕
コメント