メインコンテンツまでスキップ
バージョン: User Guides (Cloud)

データインポートハンズオン

これは、Zilliz Cloudでのデータインポートを迅速に開始するための短期集中コースです。データ準備からコレクションのセットアップ、実際のデータインポートプロセスまでを網羅しています。このチュートリアルを通して、以下を学ぶことができます:

  • スキーマの定義方法とターゲットコレクションのセットアップ方法

  • BulkWriter を使用してソースデータを準備し、リモートストレージバケットに書き込む方法

  • バルクインポートAPIを呼び出してデータをインポートする方法

はじめに

スムーズな体験を確保するため、以下のセットアップが完了していることを確認してください:

Zilliz Cloudクラスターをセットアップ

  • クラスターを作成(まだ作成していない場合)

  • 以下の情報を収集してください:クラスターエンドポイントAPIキークラスターID

依存関係をインストール

現在、データインポート関連のAPIはPythonまたはJavaで使用できます。

Python APIを使用するには、ターミナルで以下のコマンドを実行して pymilvusminio をインストールするか、最新バージョンにアップグレードしてください。

python3 -m pip install --upgrade pymilvus minio

リモートストレージバケットを設定

  • AWS S3、Google GCS、またはAzure Blobを使用してリモートバケットをセットアップしてください。

  • 次の情報をメモしてください:

    • S3互換ブロックストレージサービスの場合はアクセスキーシークレットキーバケット名

    • Microsoft Azure Blobストレージサービスの場合はアカウント名アカウントキーコンテナ名

    これらの詳細は、バケットがホストされているクラウドプロバイダのコンソールで確認できます。

例示コードの使用を促進するために、構成詳細を保存するための変数を使用することを推奨します:

## URLの値は固定です。
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
API_KEY=""

# Zilliz Cloudクラスターの設定
CLUSTER_ENDPOINT=""
CLUSTER_ID="" # Zilliz CloudクラスターID(例:"in01-xxxxxxxxxxxxxxx")
COLLECTION_NAME="zero_to_hero"

# リモートバケットの設定
BUCKET_NAME=""
ACCESS_KEY=""
SECRET_KEY=""

ターゲットコレクションスキーマをセットアップ

上記の出力に基づいて、ターゲットコレクションのスキーマを導き出すことができます。

以下のデモでは、事前定義されたスキーマに最初の4つのフィールドを含め、残りの4つを動的フィールドとして使用します。

from pymilvus import MilvusClient, DataType

# データセットからコレクションスキーマを導き出す必要があります。
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

print(schema)

上記コードのパラメータは以下の通りです:

  • fields:

    • idは主キーフィールドです。

    • float_vectorは浮動小数点ベクトルフィールドです。

    • binary_vectorはバイナリベクトルフィールドです。

    • float16_vectorは半精度浮動小数点ベクトルフィールドです。

    • sparse_vectorはスパース浮動小数点ベクトルフィールドです。

    • その他のフィールドはスカラーフィールドです。

  • auto_id=False

    これがデフォルト値です。これをTrueに設定すると、BulkWriterが生成したファイルに主キーフィールドを含めることを防止します。

  • enable_dynamic_field=True

    デフォルト値はFalseです。これをTrueに設定すると、BulkWriterが生成されたファイルから未定義のフィールドとその値をキーバリューペアとして含め、$metaという名前の予約JSONフィールドに配置することを可能にします。

スキーマが設定されたら、次のようにターゲットコレクションを作成できます:

from pymilvus import MilvusClient

# 1. Milvusクライアントをセットアップ
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=API_KEY
)

# 2. インデックスパラメータを設定
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
field_name="float_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="binary_vector",
index_type="AUTOINDEX",
metric_type="HAMMING"
)

index_params.add_index(
field_name="float16_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="sparse_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

# 3. コレクションを作成
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params
)

ソースデータを準備

BulkWriter は、JSON、Parquet、またはNumPyファイルにデータセットを書き換えることができます。RemoteBulkWriterを作成し、このライターを使用してこれらの形式にデータを書き換えます。

RemoteBulkWriterを作成

スキーマが準備できたら、そのスキーマを使用してRemoteBulkWriterを作成できます。RemoteBulkWriter はリモートバケットへのアクセス権を要求します。ConnectParam オブジェクトでリモートバケットへの接続パラメータを設定して、RemoteBulkWriterで参照する必要があります。


from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
# pymilvusのバージョンが2.4.2より古い場合は `from pymilvus import RemoteBulkWriter, BulkFileType`を使用してください

# リモートバケットにアクセスするための接続パラメータ
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="s3.amazonaws.com", # Google Cloud Storageの場合は "storage.googleapis.com" を使用
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=True
)

📘注意

endpointパラメータは、クラウドプロバイダのストレージサービスURIを指します。

S3互換ストレージサービスの場合は、以下のURIが可能です:

  • s3.amazonaws.com(AWS S3)

  • storage.googleapis.com(GCS)

Azure blobストレージコンテナの場合は、以下のような有効な接続文字列を使用する必要があります:

DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.windows.net

次に、RemoteBulkWriterで接続パラメータを以下のように参照できます:

writer = RemoteBulkWriter(
schema=schema, # ターゲットコレクションスキーマ
remote_path="/", # リモートバケットルートに対する出力ディレクトリ
segment_size=1024*1024*1024, # 生データを分割する際の最大セグメントサイズ
connect_param=conn, # 上記で定義した接続パラメータ
file_type=BulkFileType.PARQUET # 生成されるファイルのタイプ
)

# 可能なファイルタイプ:
# - BulkFileType.JSON_RB
# - BulkFileType.NPY
# - BulkFileType.PARQUET

上記のライターはJSON形式でファイルを生成し、指定されたバケットのルートフォルダにアップロードします。

  • remote_path="/"

    これはリモートバケット内の生成ファイルの出力パスを決定します。

    "/"に設定すると、RemoteBulkWriter は生成されたファイルをリモートバケットのルートフォルダに配置します。他のパスを使用するには、リモートバケットルートに対する相対パスを設定してください。

  • file_type=BulkFileType.PARQUET

    これは生成されるファイルのタイプを決定します。可能な値は以下の通りです:

    • BulkFileType.JSON_RB

    • BulkFileType.PARQUET

    • BulkFileType.NPY

  • segment_size=1024*1024*1024

    これはBulkWriterが生成されたファイルを分割するかどうかを決定します。デフォルト値は1024MB(1024 * 1024 * 1024)です。データセットに多数のレコードが含まれている場合は、segment_sizeを適切な値に設定してデータを分割することをお勧めします。

ライターを使用

ライターには2つのメソッドがあります:1つはソースデータセットから行を追加するためのメソッド、もう1つはリモートファイルにデータをコミットするためのメソッドです。

ソースデータセットから行を追加するには、次のようにします:

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# バイナリベクトルのオプション入力:
# 1. [1, 0, 1, 1, 0, 0, 1, 0] などの整数リスト
# 2. uint8のnumpy配列
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# 浮動小数点ベクトルのオプション入力:
# 1. [0.56, 1.859, 6.55, 9.45] などの浮動小数点リスト
# 2. float32のnumpy配列
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # bfloat16ベクトルのオプション入力:
# # 1. [0.56, 1.859, 6.55, 9.45] などの浮動小数点リスト
# # 2. bfloat16のnumpy配列
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# float16ベクトルのオプション入力:
# 1. [0.56, 1.859, 6.55, 9.45] などの浮動小数点リスト
# 2. float16のnumpy配列
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# スパースベクトルのオプション入力:
# {2: 13.23, 45: 0.54} や {"indices": [1, 2], "values": [0.1, 0.2]} のような辞書のみを許可
# 注:キーをソートする必要はありません
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(2000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

print(writer.batch_files)

ライターの**append_row()**メソッドは行の辞書を受け入れます。

行の辞書には、スキーマ定義フィールドのすべてがキーとして含まれている必要があります。動的フィールドが許可されている場合は、未定義のフィールドも含めることができます。詳細については、BulkWriterの使用を参照してください。

BulkWriterは**commit()**メソッドを呼び出した後にのみファイルを生成します。

writer.commit()

これで、BulkWriterは指定したリモートバケットにソースデータを準備しました。

生成されたファイルを確認するには、ライターのdata_pathプロパティを出力して実際の出力パスを取得できます。

print(writer.data_path)

# /5868ba87-743e-4d9e-8fa6-e07b39229425
📘注意

BulkWriterはUUIDを生成し、UUIDを使用して提供された出力ディレクトリにサブフォルダを作成し、すべての生成ファイルをサブフォルダに配置します。

詳細については、BulkWriterの使用を参照してください。

用意されたデータをインポート

このステップの前に、準備されたデータが既に目的のバケットにアップロードされていることを確認してください。

インポートを開始

準備されたソースデータをインポートするには、以下のように**bulk_import()**関数を呼び出す必要があります:

from pymilvus.bulk_writer import bulk_import

# リモートバケットの準備されたデータへのパブリックアクセス可能URL
object_url = "s3://{0}/{1}/".format(BUCKET_NAME, str(writer.data_path)[1:])
# Google Cloud Storageの場合、`s3`を`gs`に変更

resp = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name=COLLECTION_NAME,
object_url=object_url,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)

job_id = resp.json()['data']['jobId']
print(job_id)

# job-0103f039ccdq9aip1xd4rf
📘注意

object_urlはリモートバケット内のファイルまたはフォルダへの有効なURLでなければなりません。提供されたコードでは、format()メソッドを使用してバケット名とライターから返されたデータパスを組み合わせて有効なオブジェクトURLを作成しています。

データとターゲットコレクションがAWSでホストされている場合は、オブジェクトURLはs3://remote-bucket/file-pathに似ている必要があります。ライターから返されたデータパスに接頭辞を付けるために適用可能なURIについては、ストレージオプションを参照してください。

タスクの進行状況を確認

以下のコードは5秒ごとにバルクインポートの進行状況を確認し、進行状況をパーセンテージで出力します。

import time
from pymilvus import get_import_progress

job_id = res.json()['data']['jobId']

res = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID, # Zilliz CloudクラスターID(例:"in01-xxxxxxxxxxxxxxx")
job_id=job_id,
)

print(res.json()["data"]["progress"])

# バルクインポートの進行状況を確認
while res.json()["data"]["progress"] < 100:
time.sleep(5)

res = get_import_progress(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)

print(res.json()["data"]["progress"])

# 0 -- インポート進行状況 0%
# 49 -- インポート進行状況 49%
# 100 -- インポート完了
📘注意

getimportprogress()urlを、ターゲットコレクションのクラウドリージョンに対応するものに置き換えてください。

以下のようにすべてのバルクインポートジョブを一覧表示できます:

from pymilvus import list_import_jobs

res = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID # Zilliz CloudクラスターID(例:"in01-xxxxxxxxxxxxxxx")
)

print(res.json())

# {
# "code": 0,
# "data": {
# "records": [
# {
# "collectionName": "zero_to_hero",
# "jobId": "job-01f36d8fd67u94avjfnxi0",
# "state": "Completed"
# }
# ],
# "count": 1,
# "currentPage": 1,
# "pageSize": 10
# }
# }

まとめ

このコースでは、データインポートの全プロセスをカバーしました。以下にまとめのアイデアを示します:

  • データを検証して、ターゲットコレクションのスキーマを導き出します。

  • BulkWriterを使用する場合、以下の点に注意してください:

    • 追加する各行に、すべてのスキーマ定義フィールドをキーとして含めてください。動的フィールドが許可されている場合は、適用可能な未定義フィールドも含めてください。

    • すべての行を追加した後に**commit()**を呼び出すことを忘れないでください。

  • **bulk_import()**を使用する場合、クラウドプロバイダのエンドポイントとデータパスを結合してオブジェクトURLを構築します。データパスはライターから返されます。