メインコンテンツまでスキップ
バージョン: User Guides (Cloud)

データインポート実践ガイド

この短期集中コースでは、データ準備、コレクション設定、実際のデータインポート処理まで、Zilliz Cloud でのデータインポートをすばやく始める方法を説明します。このチュートリアルでは、以下について学びます。

  • スキーマを定義し、ターゲットコレクションを設定する方法

  • BulkWriter を使用してソースデータを準備し、リモートストレージバケットに書き込む方法

  • bulk-import API を呼び出してデータをインポートする方法

📘Notes

Zilliz Cloud では、クラスタをホストしているクラウドプロバイダに関係なく、任意のオブジェクトストレージサービスから任意の Zilliz Cloud クラスタにデータをインポートできるようになりました。たとえば、AWS S3 バケットから GCP にデプロイされた Zilliz Cloud クラスタにデータをインポートできます。

低レイテンシで安定したエクスペリエンスを確保するために、ターゲットクラスタと同じプロバイダ、同じリージョンのバケットまたは BLOB コンテナを使用することをお勧めします。

開始前の準備

スムーズに進めるために、以下の設定が完了していることを確認してください。

Zilliz Cloud クラスタを設定する

  • まだ作成していない場合は、クラスタを作成します。

  • Cluster EndpointAPI KeyCluster ID を確認しておきます。

依存関係をインストールする

現在、データインポート関連 API は Python または Java で使用できます。

Python API を使用するには、ターミナルで次のコマンドを実行し、pymilvusminio をインストールするか、最新バージョンにアップグレードします。

python3 -m pip install --upgrade pymilvus minio

リモートストレージバケットを設定する

  • AWS S3、Google GCS、または Azure Blob を使用してリモートバケットを設定します。

  • 以下を控えておきます。

    • S3 互換ブロックストレージサービスの Access KeySecret KeyBucket Name

    • Microsoft Azure Blob Storage サービスの AccountNameAccountKeyContainerName

    これらの情報は、バケットをホストしているクラウドプロバイダのコンソールで確認できます。

サンプルコードを扱いやすくするため、設定情報は変数に保存することをお勧めします。

## The value of the URL is fixed.
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
API_KEY=""

# Configs for Zilliz Cloud cluster
CLUSTER_ENDPOINT=""
CLUSTER_ID="" # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
COLLECTION_NAME="zero_to_hero"

# Configs for remote bucket
BUCKET_NAME=""
ACCESS_KEY=""
SECRET_KEY=""

ターゲットコレクションのスキーマを設定する

上記の出力を基に、ターゲットコレクションのスキーマを設計できます。

以下のデモでは、最初の 4 つのフィールドを事前定義済みスキーマに含め、残りの 4 つを動的フィールドとして使用します。

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

print(schema)

上記コードのパラメータは次のとおりです。

  • フィールド:

    • id は主キーフィールドです。

    • float_vector は浮動小数点ベクトルフィールドです。

    • binary_vector はバイナリベクトルフィールドです。

    • float16_vector は半精度浮動小数点ベクトルフィールドです。

    • sparse_vector はスパースベクトルフィールドです。

    • その他のフィールドはスカラーフィールドです。

  • auto_id=False

    これはデフォルト値です。True に設定すると、BulkWriter は生成ファイルに主キーフィールドを含めません。

  • enable_dynamic_field=True

    デフォルト値は False です。True に設定すると、BulkWriter は未定義フィールドとその値をキーと値のペアとして生成ファイルに含め、$meta という予約済み JSON フィールドに配置できます。

スキーマを設定したら、次のようにターゲットコレクションを作成できます。

from pymilvus import MilvusClient

# 1. Set up a Milvus client
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=API_KEY
)

# 2. Set index parameters
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
field_name="float_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="binary_vector",
index_type="AUTOINDEX",
metric_type="HAMMING"
)

index_params.add_index(
field_name="float16_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="sparse_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

# 3. Create collection
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params
)

ソースデータを準備する

BulkWriter はデータセットを JSON、Parquet、または NumPy ファイルに書き換えることができます。ここでは RemoteBulkWriter を作成し、この Writer を使ってデータをこれらの形式に変換します。

RemoteBulkWriter を作成する

スキーマの準備ができたら、そのスキーマを使用して RemoteBulkWriter を作成できます。RemoteBulkWriter はリモートバケットにアクセスする権限を必要とします。リモートバケットにアクセスするための接続パラメータを ConnectParam オブジェクトに設定し、それを RemoteBulkWriter で参照します。


from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
# Use \`from pymilvus import RemoteBulkWriter, BulkFileType\`
# if your pymilvus version is earlier than 2.4.2

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="s3.amazonaws.com", # Use "storage.googleapis.com" for Google Cloud Storage
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=True
)

📘Notes

endpoint パラメータは、クラウドプロバイダのストレージサービス URI を指します。

S3 互換ストレージサービスで使用できる URI は次のとおりです。

  • s3.amazonaws.com(AWS S3)

  • storage.googleapis.com (GCS)

Azure Blob Storage コンテナの場合は、次のような有効な接続文字列を使用します。

DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.windows.net

次に、RemoteBulkWriter で接続パラメータを次のように参照します。

writer = RemoteBulkWriter(
schema=schema, # Target collection schema
remote_path="/", # Output directory relative to the remote bucket root
segment_size=1024*1024*1024, # Maximum segment size when segmenting the raw data
connect_param=conn, # Connection parameters defined above
file_type=BulkFileType.PARQUET # Type of the generated file.
)

# Possible file types:
# - BulkFileType.JSON,
# - BulkFileType.NPY, and
# - BulkFileType.PARQUET

上記の Writer はファイルを JSON 形式で生成し、指定したバケットのルートフォルダにアップロードします。

  • remote_path="/"

    これは、リモートバケット内で生成ファイルを出力するパスを決定します。

    "/" に設定すると、RemoteBulkWriter は生成ファイルをリモートバケットのルートフォルダに配置します。他のパスを使用する場合は、リモートバケットのルートからの相対パスを指定します。

  • file_type=BulkFileType.PARQUET

    これは生成ファイルの種類を決定します。指定可能な値は次のとおりです。

    • BulkFileType.JSON

    • BulkFileType.PARQUET

    • BulkFileType.NPY

  • segment_size=1024*1024*1024

    これは、BulkWriter が生成ファイルを分割するかどうかを決定します。デフォルト値は 1024 MB(1024 * 1024 * 1024)です。データセットに多数のレコードが含まれる場合は、segment_size を適切な値に設定してデータを分割することをお勧めします。

Writer を使用する

Writer には 2 つのメソッドがあります。1 つはソースデータセットから行を追加するためのもので、もう 1 つはデータをリモートファイルにコミットするためのものです。

ソースデータセットから行を追加するには、次のようにします。

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(2000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

print(writer.batch_files)

Writer の append_row() メソッドは行ディクショナリを受け取ります。

行ディクショナリには、スキーマで定義されたすべてのフィールドをキーとして含める必要があります。動的フィールドが許可されている場合は、未定義フィールドも含められます。詳細については、BulkWriter の使用を参照してください。

BulkWritercommit() メソッドを呼び出した後にのみファイルを生成します。

writer.commit()

ここまでで、BulkWriter は指定したリモートバケットにソースデータを準備しました。

生成されたファイルを確認するには、Writer の data_path プロパティを出力して実際の出力パスを取得します。

print(writer.data_path)

# /5868ba87-743e-4d9e-8fa6-e07b39229425
📘Notes

BulkWriter は UUID を生成し、指定された出力ディレクトリ内にその UUID を使ったサブフォルダを作成して、すべての生成ファイルをそのサブフォルダに配置します。

詳細については、BulkWriter の使用を参照してください。

準備済みデータをインポートする

この手順を始める前に、準備済みデータが対象のバケットにアップロードされていることを確認してください。

インポートを開始する

準備済みのソースデータをインポートするには、次のように bulk_import() 関数を呼び出します。

from pymilvus.bulk_writer import bulk_import

# Publicly accessible URL for the prepared data in the remote bucket
object_url = "s3://{0}/{1}/".format(BUCKET_NAME, str(writer.data_path)[1:])
# Change \`s3\` to \`gs\` for Google Cloud Storage

resp = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name=COLLECTION_NAME,
object_url=object_url,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)

job_id = resp.json()['data']['jobId']
print(job_id)

# job-0103f039ccdq9aip1xd4rf
📘Notes

object_url は、リモートバケット内のファイルまたはフォルダを指す有効な URL である必要があります。上記のコードでは、format() メソッドを使用してバケット名と Writer が返したデータパスを結合し、有効なオブジェクト URL を作成しています。

データとターゲットコレクションが AWS でホストされている場合、オブジェクト URL は s3://remote-bucket/file-path のようになります。Writer が返したデータパスの前に付ける URI については、ストレージオプションを参照してください。

タスクの進捗を確認する

次のコードは bulk-import の進捗を 5 秒ごとに確認し、進捗をパーセンテージで出力します。

import time
from pymilvus import get_import_progress

job_id = res.json()['data']['jobId']

res = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID, # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
job_id=job_id,
)

print(res.json()["data"]["progress"])

# check the bulk-import progress
while res.json()["data"]["progress"] < 100:
time.sleep(5)

res = get_import_progress(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)

print(res.json()["data"]["progress"])

# 0 -- import progress 0%
# 49 -- import progress 49%
# 100 -- import finished
📘Notes

getimportprogress()url は、ターゲットコレクションのクラウドリージョンに対応するものに置き換えてください。

すべての bulk-import ジョブを一覧表示するには、次のようにします。

from pymilvus import list_import_jobs

res = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
)

print(res.json())

# {
# "code": 0,
# "data": {
# "records": [
# {
# "collectionName": "zero_to_hero",
# "jobId": "job-01f36d8fd67u94avjfnxi0",
# "state": "Completed"
# }
# ],
# "count": 1,
# "currentPage": 1,
# "pageSize": 10
# }
# }

まとめ

このコースでは、データインポートの一連の流れを説明しました。要点は次のとおりです。

  • データを確認し、ターゲットコレクションのスキーマを設計します。

  • BulkWriter を使用する場合は、以下に注意してください。

    • 追加する各行には、スキーマで定義されたすべてのフィールドをキーとして含めます。動的フィールドが許可されている場合は、該当する未定義フィールドも含めます。

    • すべての行を追加した後、commit() を呼び出すことを忘れないでください。

  • bulk_import() を使用する場合は、準備済みデータをホストしているクラウドプロバイダのエンドポイントと、Writer が返したデータパスを連結してオブジェクト URL を作成します。