Skip to main content
Version: User Guides (BYOC)

Use BulkWriter

If your data format does not meet the requirements on Prepare Source Data, you can use BulkWriter, a data processing tool in pymilvus and Milvus' Java SDK, to prepare your data.

Overview

BulkWriter is a script designed to convert raw datasets into a format suitable for importing via various methods such as the Zilliz Cloud console, the BulkInsert APIs of Milvus SDKs, or the Import API in RESTful flavor. It offers two types of writers:

  • LocalBulkWriter: Reads the designated dataset and transforms it into an easy-to-use format.

  • RemoteBulkWriter: Performs the same task as the LocalBulkWriter but additionally transfers the converted data files to a specified remote object storage bucket.

Procedure

Set up dependencies

Run the following command in the shell to install pymilvus or upgrade your pymilvus to the latest version.

pip install --upgrade pymilvus

Set up a collection schema

Decide on the schema for the collection you wish to import your dataset into. This involves selecting which fields to include from the dataset.

The following code creates a collection schema with all possible data types. In addition, the schema disables the primary field from automatically incrementing and enables dynamic fields.

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

Create a BulkWriter

There are two types of BulkWriters available.

  • LocalBulkWriter

    A LocalBulkWriter appends rows from the source dataset and commits them to a local file of the specified format.

    from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
    # Use `from pymilvus import LocalBulkWriter, BulkFileType`
    # when you use pymilvus earlier than 2.4.2

    writer = LocalBulkWriter(
    schema=schema,
    local_path='.',
    chunk_size=1024 * 1024 * 1024,
    file_type=BulkFileType.PARQUET
    )

    When creating a LocalBulkWriter, you should:

    • Reference the created schema in schema.

    • Set local_path to the output directory.

    • Set file_type to the output file type.

    • If your dataset contains a large number of records, you are advised to segment your data by setting segment_size to a proper value.

    For details on parameter settings, refer to LocalBulkWriter in the SDK reference.

    📘Notes

    Only JSON files generated using LocalBulkWriter can be directly imported into Zilliz Cloud.

    For files of other types, upload them to one of your buckets in the same cloud region as that of your target cluster before the import.

  • RemoteBulkWriter

    Instead of committing appended data to a local file, a RemoteBulkWriter commits them to a remote bucket. Therefore, you should set up a ConnectParam object before creating a RemoteBulkWriter.


    from pymilvus.bulk_writer import RemoteBulkWriter
    # Use `from pymilvus import RemoteBulkWriter`
    # when you use pymilvus earlier than 2.4.2

    # Third-party constants
    ACCESS_KEY="bucket-ak"
    SECRET_KEY="bucket-sk"
    BUCKET_NAME="a-bucket"

    # Connections parameters to access the remote bucket
    conn = RemoteBulkWriter.S3ConnectParam(
    endpoint="s3.amazonaws.com", # use 'storage.googleapis.com' for Google Cloud Storage
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    bucket_name=BUCKET_NAME,
    secure=True
    )

    from pymilvus.bulk_writer import BulkFileType
    # Use `from pymilvus import BulkFileType`
    # when you use pymilvus earlier than 2.4.2

    writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
    )

    print('bulk writer created.')

    Once the connection parameters are ready, you can reference it in the RemoteBulkWriter as follows:

    from pymilvus.bulk_writer import RemoteBulkWriter
    # Use `from pymilvus import RemoteBulkWriter`
    # when you use pymilvus earlier than 2.4.2

    writer = RemoteBulkWriter(
    schema=schema,
    remote_path="/",
    connect_param=conn,
    file_type=BulkFileType.PARQUET
    )

    The parameters for creating a RemoteBulkWriter are barely the same as those for a LocalBulkWriter, except connect_param. For details on parameter settings, refer to RemoteBulkWriter and ConnectParam in the SDK reference.

Start writing

A BulkWriter has two methods: append_row() adds a row from a source dataset, and commit() commits added rows to a local file or a remote bucket.

For demonstration purposes, the following code appends randomly generated data.

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(10000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

Dynamic schema support

In the previous section, we referenced a schema that permits dynamic fields in the writer, allowing undefined fields to be included when appending rows.

For demonstration purposes, the following code appends randomly generated data.

import random
import string

def generate_random_string(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

for i in range(10000):
writer.append_row({
"id": i,
"vector":[random.uniform(-1, 1) for _ in range(768)],
"dynamic_field_1": random.choice([True, False]),
"dynamic_field_2": random.randint(0, 100)
})

writer.commit()

Verify the result

To check the results, you can get the actual output path by printing the data_path property of the writer.

print(writer.batch_files)

# PosixPath('/folder/5868ba87-743e-4d9e-8fa6-e07b39229425')

BulkWriter generates a UUID, creates a sub-folder using the UUID in the provided output directory, and places all generated files in the sub-folder. Click here to download the prepared sample data.

Possible folder structures are as follows:

  • If the generated file does not exceed the specified segment size

    # JSON
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ └── 1.json

    # Parquet
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ └── 1.parquet

    # Numpy
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── id.npy
    │ ├── vector.npy
    │ ├── scalar_1.npy
    │ ├── scalar_2.npy
    │ └── $meta.npy

    File Type

    Valid Import Paths

    JSON

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/1.json

    Parquet

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/1.parquet

    NumPy

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/*.npy

  • If the generated file exceeds the specified segment size

    # The following assumes that two segments are generated.

    # JSON
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── 1.json
    │ └── 2.json

    # Parquet
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── 1.parquet
    │ └── 2.parquet

    # Numpy
    ├── folder
    │ └── 45ae1139-1d87-4aff-85f5-0039111f9e6b
    │ ├── 1
    │ │ ├── id.npy
    │ │ ├── vector.npy
    │ │ ├── scalar_1.npy
    │ │ ├── scalar_2.npy
    │ │ └── $meta.npy
    │ └── 2
    │ ├── id.npy
    │ ├── vector.npy
    │ ├── scalar_1.npy
    │ ├── scalar_2.npy
    │ └── $meta.npy

    File Type

    Valid Import Paths

    JSON

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    Parquet

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    NumPy

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/

    s3://remote_bucket/folder/45ae1139-1d87-4aff-85f5-0039111f9e6b/*.npy