メインコンテンツまでスキップ
バージョン: User Guides (Cloud)

BulkWriterの使用

データ形式が要件を満たしていない場合は、pymilvusおよびMilvusのJava SDKにあるBulkWriterというデータ処理ツールを使用してデータを準備できます。

概要

BulkWriterは、生データセットをZilliz Cloudコンソール、Milvus SDKのBulkInsert API、またはREST形式のImport APIなどのさまざまな方法でインポートするのに適した形式に変換するように設計されたスクリプトです。以下の2種類のライターを提供します:

  • LocalBulkWriter:指定されたデータセットを読み込み、使いやすい形式に変換します。

  • RemoteBulkWriterLocalBulkWriterと同じタスクを実行しますが、変換されたデータファイルを指定されたリモートオブジェクトストレージバケットに追加で転送します。

手順

依存関係をセットアップ

シェルで以下のコマンドを実行して、pymilvusをインストールするか、pymilvusを最新バージョンにアップグレードしてください。

pip install --upgrade pymilvus

コレクションスキーマをセットアップ

データセットをインポートするコレクションのスキーマを決定します。これには、データセットから含めるフィールドを選択することが含まれます。

以下のコードは、すべての可能なデータ型を持つコレクションスキーマを作成します。さらに、スキーマでは主フィールドの自動インクリメントを無効にし、動的フィールドを有効にします。

from pymilvus import MilvusClient, DataType

# データセットからコレクションスキーマを導き出す必要があります。
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

BulkWriterを作成

利用可能なBulkWriterは2種類あります。

  • LocalBulkWriter

    LocalBulkWriterはソースデータセットから行を追加し、指定された形式のローカルファイルにコミットします。

    from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
    # pymilvus 2.4.2より前のバージョンでは `from pymilvus import LocalBulkWriter, BulkFileType` を使用

    writer = LocalBulkWriter(
    schema=schema,
    local_path='.',
    chunk_size=1024 * 1024 * 1024,
    file_type=BulkFileType.PARQUET
    )

    LocalBulkWriterを作成する際には:

    • 作成したスキーマをschemaで参照してください。

    • local_pathを出力ディレクトリに設定してください。

    • file_typeを出力ファイルタイプに設定してください。

    • データセットに多数のレコードが含まれている場合は、segment_sizeを適切な値に設定してデータを分割することをお勧めします。

    パラメータ設定の詳細については、SDKリファレンスのLocalBulkWriterを参照してください。

    📘注意

    LocalBulkWriterを使用して生成されたJSONファイルおよびParquetファイルは、Zilliz Cloudコンソールで直接Zilliz Cloudにインポートできます。

    その他のタイプのファイルについては、インポートする前にバケットにアップロードしてください。ターゲットクラスターと同じクラウドリージョンのバケットにファイルをアップロードすることを推奨します。

  • RemoteBulkWriter

    追加されたデータをローカルファイルにコミットする代わりに、RemoteBulkWriterはリモートバケットにコミットします。したがって、RemoteBulkWriterを作成する前に、ConnectParamオブジェクトをセットアップする必要があります。


    from pymilvus.bulk_writer import RemoteBulkWriter
    # pymilvus 2.4.2より前のバージョンでは `from pymilvus import RemoteBulkWriter` を使用

    # サードパーティの定数
    ACCESS_KEY="bucket-ak"
    SECRET_KEY="bucket-sk"
    BUCKET_NAME="a-bucket"
    REGION_NAME="region-name"

    # リモートバケットにアクセスするための接続パラメータ
    conn = RemoteBulkWriter.S3ConnectParam(
    endpoint="s3.amazonaws.com", # Google Cloud Storageの場合は 'storage.googleapis.com' を使用
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    bucket_name=BUCKET_NAME,
    secure=True,
    region=REGION_NAME
    )

    from pymilvus.bulk_writer import BulkFileType
    # pymilvus 2.4.2より前のバージョンでは `from pymilvus import BulkFileType` を使用

    writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
    )

    print('bulk writer created.')

    接続パラメータの準備ができたら、以下のようにRemoteBulkWriterでそれを参照できます:

    from pymilvus.bulk_writer import RemoteBulkWriter
    # pymilvus 2.4.2より前のバージョンでは `from pymilvus import RemoteBulkWriter` を使用

    writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
    )

    RemoteBulkWriterを作成するためのパラメータは、connect_paramを除いてLocalBulkWriterの場合とほぼ同じです。パラメータ設定の詳細については、SDKリファレンスのRemoteBulkWriterおよびConnectParamを参照してください。

書き込みを開始

BulkWriterには2つのメソッドがあります:append_row() はソースデータセットから行を追加し、commit() は追加された行をローカルファイルまたはリモートバケットにコミットします。

デモンストレーション目的として、以下のコードはランダムに生成されたデータを追加します。

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# バイナリベクトルのオプション入力:
# 1. [1, 0, 1, 1, 0, 0, 1, 0] などの整数リスト
# 2. uint8のnumpy配列
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# 浮動小数点ベクトルのオプション入力:
# 1. [0.56, 1.859, 6.55, 9.45] などの浮動小数点リスト
# 2. float32のnumpy配列
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # bfloat16ベクトルのオプション入力:
# # 1. [0.56, 1.859, 6.55, 9.45] などの浮動小数点リスト
# # 2. bfloat16のnumpy配列
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# float16ベクトルのオプション入力:
# 1. [0.56, 1.859, 6.55, 9.45] などの浮動小数点リスト
# 2. float16のnumpy配列
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# スパースベクトルのオプション入力:
# {2: 13.23, 45: 0.54} や {"indices": [1, 2], "values": [0.1, 0.2]} のような辞書のみを許可
# 注:キーをソートする必要はありません
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(10000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

動的スキーマサポート

前のセクションでは、ライターで動的フィールドを許可するスキーマを参照し、行を追加する際に未定義フィールドを含めることを可能にしました。

デモンストレーション目的として、以下のコードはランダムに生成されたデータを追加します。

import random
import string

def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})

writer.commit()

結果の確認

結果を確認するには、ライターのdata_pathプロパティを出力して実際の出力パスを取得できます。

print(writer.batch_files)

# PosixPath('/folder/5868ba87-743e-4d9e-8fa6-e07b39229425')

BulkWriterはUUIDを生成し、UUIDを使用して提供された出力ディレクトリにサブフォルダを作成し、すべての生成されたファイルをサブフォルダに入れます。準備されたサンプルデータをダウンロードするにはここをクリック

考えられるフォルダ構造は以下の通りです:

  • 生成されたファイルが指定されたセグメントサイズを超えない場合

    # JSON
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ └── 1.json

    # Parquet
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ └── 1.parquet

    # Numpy
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── id.npy
    │ ├── vector.npy
    │ ├── scalar_1.npy
    │ ├── scalar_2.npy
    │ └── $meta.npy

    ファイルタイプ

    有効なインポートパス

    JSON

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/1.json

    Parquet

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/1.parquet

    NumPy

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/.npy*

  • 生成されたファイルが指定されたセグメントサイズを超える場合

    # 以下は2つのセグメントが生成されたと仮定しています。

    # JSON
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── 1.json
    │ └── 2.json

    # Parquet
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── 1.parquet
    │ └── 2.parquet

    # Numpy
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── 1
    │ │ ├── id.npy
    │ │ ├── vector.npy
    │ │ ├── scalar_1.npy
    │ │ ├── scalar_2.npy
    │ │ └── $meta.npy
    │ └── 2
    │ ├── id.npy
    │ ├── vector.npy
    │ ├── scalar_1.npy
    │ ├── scalar_2.npy
    │ └── $meta.npy

    ファイルタイプ

    有効なインポートパス

    JSON

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    Parquet

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    NumPy

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/.npy*