メインコンテンツまでスキップ
バージョン: User Guides (Cloud)

データインポートハンズオン

Zilliz Cloudへのデータのインポートを、データの準備や収集の設定から実際のデータインポートの過程まで、迅速に開始するためのファストトラックコースです。このチュートリアルでは、以下のことを学びます:

  • スキーマを定義し、ターゲットコレクションを設定する方法

  • BulkWriterを使用してソースデータ準備し、リモートストレージバケットに書き込む方法

  • バルクインポートAPIを呼び出してデータをインポートする方法

始める前に

スムーズな体験を確保するために、以下の設定を完了していることを確認してください。

Zilliz Cloudクラスタのセットアップ

  • まだ作成していない場合は、クラスタを作成します。

  • これらの詳細を収集してください:クラスターエンドポイントAPIキークラスターID

依存関係のインストール

現在、PythonまたはJavaでデータインポート関連APIを使用できます。

Python APIを使用するには、ターミナルで次のコマンドを実行して、pymilvusminioをインストールするか、最新バージョンにアップグレードしてください。

python3 -m pip install --upgrade pymilvus minio

リモートストレージバケットの設定

  • リモートバケットを設定するには、AWSS 3、Google GCS、またはAzureBlobを使用します。

  • メモしてください

    • S 3互換ブロックストレージサービスのアクセスキーシークレットキー、およびバケット名

    • AccountNameAccountKey、およびContainerNameMicrosoft Azure BLOBストレージサービス。

    これらの詳細は、バケットがホストされているクラウドプロバイダのコンソールで確認できます。

サンプルコードの使用を強化するために、構成の詳細を格納するために変数を使用することをお勧めします

## The value of the URL is fixed.
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
API_KEY=""

# Configs for Zilliz Cloud cluster
CLUSTER_ENDPOINT=""
CLUSTER_ID="" # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
COLLECTION_NAME="zero_to_hero"

# Configs for remote bucket
BUCKET_NAME=""
ACCESS_KEY=""
SECRET_KEY=""

ターゲットコレクションスキーマを設定する

上記の出力に基づいて、ターゲットコレクションのスキーマを作成できます。

次のデモでは、事前に定義されたスキーマに最初の4つのフィールドを含め、他の4つを動的フィールドとして使用します。

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

print(schema)

上記のコードのパラメータは次のように説明されています

  • フィールド:

    • idはプライマリフィールドです。

    • float_vectorは浮動小数点ベクトル場です。

    • binary_vectorはバイナリベクトル場です。

    • float 16_vectorは、半精度の浮動小数点ベクトルフィールドです。

    • sparse_vectorは疎ベクトル場です。

    • 残りのフィールドはスカラーフィールドです。

  • auto_id=Falseの場合

    これがデフォルト値です。Trueに設定すると、BulkWriterはプライマリフィールドを生成ファイルに含めません。

  • Enable_Dynamicフィールドを有効にする

    値のデフォルトはFalseです。これをTrueに設定すると、BulkWriterは未定義のフィールドと生成されたファイルの値をキーと値のペアとして含め、予約済みのJSONフィールド**$meta**に配置します。

スキーマが設定されたら、次のようにターゲットコレクションを作成できます。

from pymilvus import MilvusClient

# 1. Set up a Milvus client
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=API_KEY
)

# 2. Set index parameters
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
field_name="float_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="binary_vector",
index_type="AUTOINDEX",
metric_type="HAMMING"
)

index_params.add_index(
field_name="float16_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="sparse_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

# 3. Create collection
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params
)

ソースデータを準備する

BulkWriterは、データセットをJSON、Parquet、またはNumPyファイルに書き換えることができます。RemoteBulkWriterを作成し**、**これらの形式にデータを書き換えます。

RemoteBulkWriterの作成

スキーマが準備できたら、スキーマを使用してRemoteBulkWriterを作成できます。RemoteBulkWriterは、リモートバケットにアクセスする権限を要求します。リモートバケットにアクセスするための接続パラメータをConnectParamオブジェクトで設定し、RemoteBulkWriterで参照する必要がありま


from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
# Use `from pymilvus import RemoteBulkWriter, BulkFileType`
# if your pymilvus version is earlier than 2.4.2

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="s3.amazonaws.com", # Use "storage.googleapis.com" for Google Cloud Storage
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=True
)

📘ノート

エンドポイントパラメーターは、クラウドプロバイダーのストレージサービスURIを参照します。

S 3互換ストレージサービスの場合、使用可能なURIは次のとおりです。

  • s3.amazonaws.comAWSの場合

  • storage.googleapis.com(GCSの)

Azure BLOBストレージコンテナーの場合は、次のような有効な接続文字列を使用する必要があります。

DefaultEndpointsProtocol=https;アカウント名=<アカウント名>;アカウントキー=<アカウントキー>;エンドポイントサフィックス=core.windows.net

次に、RemoteBulkWriterの接続パラメーターを次のように参照できます。

writer = RemoteBulkWriter(
schema=schema, # Target collection schema
remote_path="/", # Output directory relative to the remote bucket root
segment_size=1024*1024*1024, # Maximum segment size when segmenting the raw data
connect_param=conn, # Connection parameters defined above
file_type=BulkFileType.PARQUET # Type of the generated file.
)

# Possible file types:
# - BulkFileType.JSON_RB,
# - BulkFileType.NPY, and
# - BulkFileType.PARQUET

上記のライターはJSON形式のファイルを生成し、指定されたバケットのルートフォルダにアップロードします。

  • リモートパスの設定

    これにより、リモートバケット内の生成されたファイルの出力パスが決定されます。

    リモートバケットのルートフォルダに生成されたファイルを"/"に設定すると、RemoteBulkWriterがリモートバケットのルートフォルダに配置します。他のパスを使用する場合は、リモートバケットのルートからの相対パスに設定してください。

  • file_type=ファイルタイプ

    生成されるファイルの種類を決定します。可能な値は以下の通りです:

    • BulkFileType. JSON_RBファイル

    • バルクファイルタイプ. PARQUET

    • 一括ファイルタイプ. NPY

  • セグメントサイズ=1024*1024*102 4

    これにより、BulkWriterが生成されたファイルをセグメント化するかどうかが決まります。デフォルト値は1024 MB(102410241024)です。データセットに多数のレコードが含まれている場合は、sement_sizeを適切な値に設定してデータをセグメント化することをお勧めします。

ライターを使用する

ライターには2つの方法があります。1つはソースデータセットから行を追加するためのもので、もう1つはデータをリモートファイルにコミットするためのものです。

次のようにソースデータセットから行を追加できます:

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(2000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

print(writer.batch_files)

ライターの**append_row()**メソッドは、行の辞書を受け取ります。

行ディクショナリには、すべてのスキーマ定義フィールドをキーとして含める必要があります。動的フィールドが許可されている場合は、未定義フィールドも含めることができます。詳細は、「BulkWriterを使う」を参照してください。

BulkWriterは、**commit()**メソッドを呼び出した後にのみファイルを生成します。

writer.commit()

今まで、BulkWriterは指定されたリモートバケットにソースデータを準備しています。

生成されたファイルを確認するには、ライターのdata_pathプロパティを印刷して実際の出力パスを取得できます。

print(writer.data_path)

# /5868ba87-743e-4d9e-8fa6-e07b39229425
📘ノート

BulkWriterはUUIDを生成し、指定された出力ディレクトリにUUIDを使用してサブフォルダを作成し、生成されたすべてのファイルをサブフォルダに配置します。

詳細は、「BulkWriterを使う」を参照してください。

準備されたデータをインポートする

このステップの前に、準備したデータが目的のバケットにすでにアップロードされていることを確認してください。

インポートを開始

準備したソースデータをインポートするには、次のように**bulk_import()**関数を呼び出す必要があります。

from pymilvus.bulk_writer import bulk_import

# Publicly accessible URL for the prepared data in the remote bucket
object_url = "s3://{0}/{1}/".format(BUCKET_NAME, str(writer.data_path)[1:])
# Change `s3` to `gs` for Google Cloud Storage

resp = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name=COLLECTION_NAME,
object_url=object_url,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)

job_id = resp.json()['data']['jobId']
print(job_id)

# job-0103f039ccdq9aip1xd4rf
📘ノート

object_urlは、リモートバケット内のファイルまたはフォルダへの有効なURLである必要があります。提供されたコードでは、format()メソッドを使用して、バケット名とライターによって返されたデータパスを組み合わせて、有効なオブジェクトURLを作成します。

データとターゲットコレクションがAWSによってホストされている場合、オブジェクトURLはs 3://remote-bucket/file-pathに似ている必要があります。ライターによって返されたデータパスのプレフィックスに適用可能なURIについては、「ストレージオプション」を参照してください。

タスクの進捗を確認する

次のコードは、5秒ごとに一括インポートの進行状況をチェックし、進行状況をパーセンテージで出力します。

import time
from pymilvus import get_import_progress

job_id = res.json()['data']['jobId']

res = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID, # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
job_id=job_id,
)

print(res.json()["data"]["progress"])

# check the bulk-import progress
while res.json()["data"]["progress"] < 100:
time.sleep(5)

res = get_import_progress(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)

print(res.json()["data"]["progress"])

# 0 -- import progress 0%
# 49 -- import progress 49%
# 100 -- import finished
📘ノート

geti mportgress()内のurlを、ターゲットコレクションのクラウドリージョンに対応するものに置き換えてください。

すべての一括インポートジョブを次のように一覧表示できます。

from pymilvus import list_import_jobs

res = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
)

print(res.json())

# {
# "code": 0,
# "data": {
# "records": [
# {
# "collectionName": "zero_to_hero",
# "jobId": "job-01f36d8fd67u94avjfnxi0",
# "state": "Completed"
# }
# ],
# "count": 1,
# "currentPage": 1,
# "pageSize": 10
# }
# }

まとめ

このコースでは、データのインポートの全過程をカバーしました。以下にまとめるアイデアをいくつか紹介します。

  • ターゲットコレクションのスキーマを計算するためにデータを調べてください。

  • BulkWriterを使用する場合、以下の点に注意してください。

    • 各行に追加するキーとして、スキーマ定義フィールドをすべて含めます。動的フィールドが許可されている場合は、適用可能な未定義フィールドも含めます。

    • すべての行を追加した後に**commit()**を呼び出すことを忘れないでください。

  • bulk_import**()**を使用する場合、準備したデータをホストするクラウドプロバイダのエンドポイントと、ライターが返すデータパスを連結してオブジェクトURLを構築します。