データのマージPrivate Preview
既存のZilliz Cloudコレクションのデータとローカルファイルまたは外部オブジェクトストレージバケットのデータをマージして、両方のソースからデータを結合したコレクションを作成できます。これはデータマージ操作と呼ばれており、既存のコレクションにデータを持つフィールドを追加するための回避策として使用できます。
- この機能は現在プライベートプレビュー中です。この機能に興味があり、試してみたい場合は、Zilliz Cloudサポートまでお気軽にお問い合わせください。
概要
データマージ操作は、リレーショナルデータベースのLEFT JOIN操作に似ており、コレクションのデータと指定されたデータソースからのすべての一致するレコードを結合し、マージされたデータを新しいコレクションに格納します。
データソースは、Zilliz Cloudステージまたはオブジェクトストレージバケットに格納されたPARQUETファイルのセットである必要があります。
次の図に示すように、3つのフィールドを含むコレクションがあり、idフィールドが主キーとして機能します。さらに、idとdateという2つのフィールドを持つPARQUETファイルがあります。idフィールドはマージキーとして機能し、その値はソースコレクションの値と一致する必要があります。dateフィールドは追加されるフィールドです。

PARQUETファイルをZilliz Cloudステージまたはオブジェクトストレージバケットにアップロードすると、マージデータAPIを使用して、両方のソースからデータを格納するターゲットコレクションを作成できます。
データソースはオプションです。データソースを指定せずにマージデータAPIを使用して、既存のコレクションにフィールドを追加する回避策として使用することもできます。
このガイドでは、データあり・データなしでフィールドを追加する方法について説明します。
データのあるフィールドを追加
データのあるフィールドを追加するには、ソースコレクション、データソース、およびターゲットコレクションに追加する新しいフィールドを指定する必要があります。
データソースは、Zilliz CloudステージまたはAWS S3バケットのいずれかにあるPARQUETファイルのセットである必要があります。
ステージの使用
ステージを使用してデータマージ操作を実行するには、まずステージを作成し、ローカルファイルシステムからデータをアップロードします。その後、データマージ操作を実行して、既存のコレクションとステージのデータを組み合わせた新しいコレクションを作成できます。
次のコードスニペットは、ステージを使用してデータマージ操作を実行する方法を示しています。ステージの作成方法とデータのアップロード方法の詳細については、ステージの管理を参照してください。
export BASE_URL="https://api.cloud.zilliz.com"
export TOKEN="YOUR_API_KEY"
curl --request POST \
--url "${BASE_URL}/v2/etl/merge" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"clusterId": "in00-xxxxxxxxxxxxxxx",
"dbName": "my_database",
"collectionName": "my_collection",
"destDbName": "my_database",
"destCollectionName": "my_merged_collection",
"dataSource": {
"type": "stage",
"stageName": "my_stage",
"dataPath": "path/to/your/parquet.parquet"
},
"mergeField": "id",
"newFields": [
{
"fieldName": "date",
"dataType": "VARCHAR",
"params": {
"maxLength": 10
}
}
]
}'
上記のコマンドを実行する前に、注意が必要なフィールドがいくつかあります。
-
dbNameとcollectionNameこれら2つのパラメータは、データマージ操作のソースコレクションを決定します。
-
destDbNameとdestCollectionNameこれら2つのパラメータは、データマージ操作後に生成されるターゲットコレクションを決定します。ターゲットコレクションは、ソースコレクションと同じクラスターに属している必要があります。
-
dataSourceこのパラメータはオプションで、データソースタイプやソースコレクションからのデータとマージされ、ターゲットコレクションに保存される列方向データを含むParquetファイルのパスなどのデータソース設定を含みます。
中間ストレージとしてステージを使用する場合は、
typeをstageに設定した後にstageNameとdataPathを設定する必要があります。📘注意dataPathパラメータの値は、ステージのルートに対するファイルの絶対パス、または複数のParquetファイルを格納するステージ内のフォルダにすることができます。値がフォルダを指す場合は、フォルダ内のParquetファイルが同じデータ構造を持っていることを確認してください。
たとえば、値は
path/to/your/file.parquet(ファイル)またはpath/to/your/folder/(フォルダ)にすることができます。- データのないフィールドを追加する場合は、このパラメータを指定しないままにできます。
-
mergeFieldデータマージ操作は、リレーショナルデータベースシステムのLEFT JOIN操作に似ており、マージフィールドはソースコレクションと列方向データを含むParquetファイル間の共有キーとして機能します。
-
newFieldsこれは、データマージ操作後にターゲットコレクションに追加するフィールドのスキーマのリストです。サポートされているデータ型はVACHAR、INT8、INT16、INT32、INT64、FLOAT、DOUBLE、およびBOOLです。
上記のコマンドはデータマージジョブを作成し、そのIDを返します。
{
"code": 0,
"data": {
"jobId": "job-xxxxxxxxxxxxxxxxxxxxx"
}
}
オブジェクトストレージの使用
オブジェクトストレージバケットを使用してデータマージ操作を実行するには、まずオブジェクトストレージバケットを作成し、データをアップロードします。その後、データマージ操作を実行して、既存のコレクションとバケットのデータを組み合わせた新しいコレクションを作成できます。
次のコードスニペットは、オブジェクトストレージバケットを使用してデータマージ操作を実行する方法を示しています。ブロックストレージサービスプロバイダのドキュメントを参照して、バケットの作成方法とデータのアップロード方法を確認できます。
export BASE_URL="https://api.cloud.zilliz.com"
export TOKEN="YOUR_API_KEY"
curl --request POST \
--url "${BASE_URL}/v2/etl/merge" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"clusterId": "in00-xxxxxxxxxxxxxxx",
"dbName": "my_database",
"collectionName": "my_collection",
"destDbName": "my_database",
"destCollectionName": "my_merged_collection",
"dataSource": {
"type": "s3",
"dataPath": "s3://my_bucket/path/to/your/parquet.parquet",
"credential": {
"accessKey": "xxxx",
"secretKey": "xxxx"
}
},
"mergeField": "id",
"newFields": [
{
"fieldName": "date",
"dataType": "VARCHAR",
"params": {
"maxLength": 10
}
}
]
}'
上記のコマンドを実行する前に、注意が必要なフィールドがいくつかあります。
-
dbNameとcollectionNameこれら2つのパラメータは、データマージ操作のソースコレクションを決定します。
-
destDbNameとdestCollectionNameこれら2つのパラメータは、データマージ操作後に生成されるターゲットコレクションを決定します。ターゲットコレクションは、ソースコレクションと同じクラスターに属していることに注意してください。
-
dataSourceこのパラメータはオプションで、データソースタイプやソースコレクションからのデータとマージされ、ターゲットコレクションに保存される列方向データを含むParquetファイルのパスなどのデータソース設定を含みます。
中間ストレージとしてS3互換のオブジェクトストレージバケットを使用する場合は、
typeをs3に設定した後にdataPathとcredentialを設定する必要があります。📘注意dataPathパラメータの値は、バケットのルートに対するファイルの絶対パス、または複数のParquetファイルを格納するバケット内のフォルダにすることができます。値がフォルダを指す場合は、フォルダ内のParquetファイルが同じデータ構造を持っていることを確認してください。
たとえば、値は
s3://path/to/your/file.parquet(ファイル)またはs3://path/to/your/folder/(フォルダ)にすることができます。- データのないフィールドを追加する場合は、このパラメータを指定しないままにできます。
-
mergeFieldデータマージ操作は、リレーショナルデータベースシステムのLEFT JOIN操作に似ており、マージフィールドはソースコレクションと列方向データを含むParquetファイル間の共有キーとして機能します。
-
newFieldsこれは、データマージ操作後にターゲットコレクションに追加するフィールドのスキーマのリストです。サポートされているデータ型はVACHAR、INT8、INT16、INT32、INT64、FLOAT、DOUBLE、およびBOOLです。
上記のコマンドはデータマージジョブを作成し、そのIDを返します。
{
"code": 0,
"data": {
"jobId": "job-xxxxxxxxxxxxxxxxxxxxx"
}
}
データのないフィールドを追加
マージデータAPIを使用して、既存のコレクションにフィールドを追加する回避策として使用することもできます。この場合、データソースを設定する必要はありません。
export BASE_URL="https://api.cloud.zilliz.com"
export TOKEN="YOUR_API_KEY"
curl --request POST \
--url "${BASE_URL}/v2/etl/merge" \
--header "Authorization: Bearer ${TOKEN}" \
--header "Content-Type: application/json" \
-d '{
"clusterId": "in00-xxxxxxxxxxxxxxx",
"dbName": "my_database",
"collectionName": "my_collection",
"destDbName": "my_database",
"destCollectionName": "my_merged_collection",
"mergeField": "id",
"newFields": [
{
"fieldName": "date",
"dataType": "VARCHAR",
"params": {
"maxLength": 10
}
}
]
}'
上記のコマンドはデータマージジョブを作成し、そのIDを返します。
{
"code": 0,
"data": {
"jobId": "job-xxxxxxxxxxxxxxxxxxxxx"
}
}
結果の確認
データマージジョブのIDを取得した後、ジョブの詳細またはプロジェクトジョブの管理に記載されている手順を使用して、そのステータスを詳細に確認できます。
データマージジョブが完了すると、ターゲットコレクションのスキーマとターゲットコレクション内のエンティティ数が期待通りであるかどうかを確認できます。
トラブルシューティング
-
Parquetファイルの行にソースコレクションのエンティティと一致しないマージキーがある場合、どのように対処すればよいですか?
リレーショナルデータベースシステムのLEFT JOIN操作と同様に、データマージ操作はソースコレクションからのすべての行と、指定されたParquetファイルからの一致する行を結合します。これにより、ソースからのすべてのフィールド、newFieldsで定義されたフィールド、および結合されたデータを含む新しい宛先コレクションが作成されます。
ソースコレクションのマージキーと一致するマージキーを持つParquetファイルからの行のみがマージされます。ソースコレクションのエンティティと一致しないマージキーを持つ行はスキップされます。Parquetファイルの行がいずれもエンティティと一致しない場合、構成されている場合は
newFieldsで指定されたフィールドのみがnull値で作成されます。