Skip to main content
Version: User Guides (Cloud)

Data Import Hands-On

This is a fast-track course to help you quickly start importing data on Zilliz Cloud, from data preparation and collection setup to the actual data import process. Throughout this tutorial, you will learn:

  • How to define a schema and set up a target collection

  • How to prepare source data using BulkWriter and write it to a remote storage bucket

  • How to import data by calling bulk-import APIs

Before you start

To ensure a smooth experience, make sure you have completed the following setups:

Set up your Zilliz Cloud cluster

  • If you have not already, create a cluster.

  • Gather these details: Cluster Endpoint, API Key, Cluster ID.

Install dependencies

Currently, you can use data-import-related APIs in Python or Java.

To use the Python API, run the following command in your terminal to install pymilvus and minio or upgrade them to the latest version.

python3 -m pip install --upgrade pymilvus minio

Configure your remote storage bucket

  • Set up a remote bucket using AWS S3, Google GCS, or Azure Blob.

  • Note down

    • Access Key, Secret Key, and Bucket Name for S3-compatible block storage service.

    • AccountName, AccountKey, and ContainerName for Microsoft Azure blob storage service.

    These details are available in the console of the cloud provider where your bucket is hosted.

To enhance the usage of the example code, we recommend you use variables to store the configuration details:

## The value of the URL is fixed.
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
API_KEY=""

# Configs for Zilliz Cloud cluster
CLUSTER_ENDPOINT=""
CLUSTER_ID="" # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
COLLECTION_NAME="zero_to_hero"

# Configs for remote bucket
BUCKET_NAME=""
ACCESS_KEY=""
SECRET_KEY=""

Set up target collection schema

Based on the output above, we can work out a schema for our target collection.

In the following demo, we will include the first four fields in the pre-defined schema and use the other four as dynamic fields.

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

DIM = 512

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True),
schema.add_field(field_name="bool", datatype=DataType.BOOL),
schema.add_field(field_name="int8", datatype=DataType.INT8),
schema.add_field(field_name="int16", datatype=DataType.INT16),
schema.add_field(field_name="int32", datatype=DataType.INT32),
schema.add_field(field_name="int64", datatype=DataType.INT64),
schema.add_field(field_name="float", datatype=DataType.FLOAT),
schema.add_field(field_name="double", datatype=DataType.DOUBLE),
schema.add_field(field_name="varchar", datatype=DataType.VARCHAR, max_length=512),
schema.add_field(field_name="json", datatype=DataType.JSON),
schema.add_field(field_name="array_str", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=128)
schema.add_field(field_name="array_int", datatype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64)
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=DIM),
schema.add_field(field_name="binary_vector", datatype=DataType.BINARY_VECTOR, dim=DIM),
schema.add_field(field_name="float16_vector", datatype=DataType.FLOAT16_VECTOR, dim=DIM),
# schema.add_field(field_name="bfloat16_vector", datatype=DataType.BFLOAT16_VECTOR, dim=DIM),
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

schema.verify()

print(schema)

The parameters in the above code are described as follows:

  • fields:

    • id is the primary field.

    • float_vector is a floating vector field.

    • binary_vector is a binary vector field.

    • float16_vector is a half-precision floating vector field.

    • sparse_vector is a sparse vector field.

    • The rest fields are scalar fields.

  • auto_id=False

    This is the default value. Setting this to True prevents BulkWriter from including the primary field in generated files.

  • enable_dynamic_field=True

    The value defaults to False. Setting this to True allows BulkWriter to include undefined fields and their values from the generated files as key-value pairs and place them in a reserved JSON field named $meta.

Once the schema is set, you can create the target collection as follows:

from pymilvus import MilvusClient

# 1. Set up a Milvus client
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=API_KEY
)

# 2. Set index parameters
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
field_name="float_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="binary_vector",
index_type="AUTOINDEX",
metric_type="HAMMING"
)

index_params.add_index(
field_name="float16_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

index_params.add_index(
field_name="sparse_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

# 3. Create collection
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params
)

Prepare source data

BulkWriter can rewrite your dataset into JSON, Parquet, or NumPy files. We will create a RemoteBulkWriter and use the writer to rewrite your data into these formats.

Create RemoteBulkWriter

Once the schema is ready, you can use the schema to create a RemoteBulkWriter. A RemoteBulkWriter asks for permission to access a remote bucket. You should set up connection parameters to access the remote bucket in a ConnectParam object and reference it in the RemoteBulkWriter.


from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
# Use `from pymilvus import RemoteBulkWriter, BulkFileType`
# if your pymilvus version is earlier than 2.4.2

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="s3.amazonaws.com", # Use "storage.googleapis.com" for Google Cloud Storage
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=True
)

📘Notes

The endpoint parameter refers to the storage service URI of your cloud provider.

For an S3-compatible storage service, possible URIs are as follows:

  • s3.amazonaws.com(AWS S3)

  • storage.googleapis.com (GCS)

For an Azure blob storage container, you should use a valid connection string similar to the following:

DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.windows.net

Then, you can reference the connection parameters in the RemoteBulkWriter as follows:

writer = RemoteBulkWriter(
schema=schema, # Target collection schema
remote_path="/", # Output directory relative to the remote bucket root
segment_size=1024*1024*1024, # Maximum segment size when segmenting the raw data
connect_param=conn, # Connection parameters defined above
file_type=BulkFileType.PARQUET # Type of the generated file.
)

# Possible file types:
# - BulkFileType.JSON_RB,
# - BulkFileType.NPY, and
# - BulkFileType.PARQUET

The above writer generates files in JSON format and uploads them to the root folder of the specified bucket.

  • remote_path="/"

    This determines the output path of the generated files in the remote bucket.

    Setting it to "/" makes the RemoteBulkWriter place the generated files in the root folder of the remote bucket. To use other paths, set it to a path relative to the remote bucket root.

  • file_type=BulkFileType.PARQUET

    This determines the type of generated files. Possible values are as follows:

    • BulkFileType.JSON_RB

    • BulkFileType.PARQUET

    • BulkFileType.NPY

  • segment_size=1024*1024*1024

    This determines whether BulkWriter segments the generated files. The value defaults to 1024 MB (1024 * 1024 * 1024). If your dataset contains a great number of records, you are advised to segment your data by setting segment_size to a proper value.

Use the writer

A writer has two methods: one is for appending rows from the source dataset, and the other is for committing data to remote files.

You can append rows from the source dataset as follows:

import random, string, json
import numpy as np
import tensorflow as tf

def generate_random_str(length=5):
letters = string.ascii_uppercase
digits = string.digits

return ''.join(random.choices(letters + digits, k=length))

# optional input for binary vector:
# 1. list of int such as [1, 0, 1, 1, 0, 0, 1, 0]
# 2. numpy array of uint8
def gen_binary_vector(to_numpy_arr):
raw_vector = [random.randint(0, 1) for i in range(DIM)]
if to_numpy_arr:
return np.packbits(raw_vector, axis=-1)
return raw_vector

# optional input for float vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float32
def gen_float_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype="float32")
return raw_vector

# # optional input for bfloat16 vector:
# # 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# # 2. numpy array of bfloat16
# def gen_bf16_vector(to_numpy_arr):
# raw_vector = [random.random() for _ in range(DIM)]
# if to_numpy_arr:
# return tf.cast(raw_vector, dtype=tf.bfloat16).numpy()
# return raw_vector

# optional input for float16 vector:
# 1. list of float such as [0.56, 1.859, 6.55, 9.45]
# 2. numpy array of float16
def gen_fp16_vector(to_numpy_arr):
raw_vector = [random.random() for _ in range(DIM)]
if to_numpy_arr:
return np.array(raw_vector, dtype=np.float16)
return raw_vector

# optional input for sparse vector:
# only accepts dict like {2: 13.23, 45: 0.54} or {"indices": [1, 2], "values": [0.1, 0.2]}
# note: no need to sort the keys
def gen_sparse_vector(pair_dict: bool):
raw_vector = {}
dim = random.randint(2, 20)
if pair_dict:
raw_vector["indices"] = [i for i in range(dim)]
raw_vector["values"] = [random.random() for _ in range(dim)]
else:
for i in range(dim):
raw_vector[i] = random.random()
return raw_vector

for i in range(2000):
writer.append_row({
"id": np.int64(i),
"bool": True if i % 3 == 0 else False,
"int8": np.int8(i%128),
"int16": np.int16(i%1000),
"int32": np.int32(i%100000),
"int64": np.int64(i),
"float": np.float32(i/3),
"double": np.float64(i/7),
"varchar": f"varchar_{i}",
"json": json.dumps({"dummy": i, "ok": f"name_{i}"}),
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
"float_vector": gen_float_vector(True),
"binary_vector": gen_binary_vector(True),
"float16_vector": gen_fp16_vector(True),
# "bfloat16_vector": gen_bf16_vector(True),
"sparse_vector": gen_sparse_vector(True),
f"dynamic_{i}": i,
})
if (i+1)%1000 == 0:
writer.commit()
print('committed')

print(writer.batch_files)

The append_row() method of the writer accepts a row dictionary.

A row dictionary should contain all schema-defined fields as keys. If dynamic fields are allowed, it can also include undefined fields. For details, refer to Use BulkWriter.

BulkWriter generates files only after you call its commit() method.

writer.commit()

Till now, BulkWriter has prepared the source data for you in the specified remote bucket.

To check the generated files, you can get the actual output path by printing the data_path property of the writer.

print(writer.data_path)

# /5868ba87-743e-4d9e-8fa6-e07b39229425
📘Notes

BulkWriter generates a UUID, creates a sub-folder using the UUID in the provided output directory, and places all generated files in the sub-folder.

For details, refer to Use BulkWriter.

Import prepared data

Before this step, ensure that the prepared data has already been uploaded to the desired bucket.

Start importing

To import the prepared source data, you need to call the bulk_import() function as follows:

from pymilvus.bulk_writer import bulk_import

# Publicly accessible URL for the prepared data in the remote bucket
object_url = "s3://{0}/{1}/".format(BUCKET_NAME, str(writer.data_path)[1:])
# Change `s3` to `gs` for Google Cloud Storage

resp = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name=COLLECTION_NAME,
object_url=object_url,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)

job_id = resp.json()['data']['jobId']
print(job_id)

# job-0103f039ccdq9aip1xd4rf
📘Notes

The object_url should be a valid URL to a file or folder in the remote bucket. In the code provided, the format() method is used to combine the bucket name and the data path returned by the writer to create a valid object URL.

If the data and target collection are hosted by AWS, the object URL should be similar to s3://remote-bucket/file-path. For applicable URI to prefix the data path returned by the writer, please refer to Storage Options.

Check task progress

The following code checks the bulk-import progress every 5 seconds and outputs the progress in percentage.

import time
from pymilvus import get_import_progress

job_id = res.json()['data']['jobId']

res = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID, # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
job_id=job_id,
)

print(res.json()["data"]["progress"])

# check the bulk-import progress
while res.json()["data"]["progress"] < 100:
time.sleep(5)

res = get_import_progress(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)

print(res.json()["data"]["progress"])

# 0 -- import progress 0%
# 49 -- import progress 49%
# 100 -- import finished
📘Notes

Replace url in the getimportprogress() with the one corresponding to the cloud region of the target collection.

You can list all bulk-import jobs as follows:

from pymilvus import list_import_jobs

res = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
)

print(res.json())

# {
# "code": 0,
# "data": {
# "records": [
# {
# "collectionName": "zero_to_hero",
# "jobId": "job-01f36d8fd67u94avjfnxi0",
# "state": "Completed"
# }
# ],
# "count": 1,
# "currentPage": 1,
# "pageSize": 10
# }
# }

Recaps

In this course, we have covered the entire process of importing data, and here are some ideas to recap:

  • Examine your data to work out the schema of the target collection.

  • When using BulkWriter, note the following:

    • Include all schema-defined fields as keys in each row to append. If dynamic fields are allowed, include also applicable undefined fields.

    • Do not forget to call commit() after appending all rows.

  • When using bulk_import(), build the object URL by concatenating the endpoint of the cloud provider hosting the prepared data and the data path returned by the writer.