Skip to main content
Version: User Guides (BYOC)

User Guide: Data Import from Zero to Hero

This is a fast-track course to help you quickly start importing data on Zilliz Cloud, from data preparation and collection setup to the actual data import process. Throughout this tutorial, you will learn:

  • How to define a schema and set up a target collection

  • How to prepare source data using BulkWriter and write it to a remote storage bucket

  • How to import data by calling bulk-import APIs

Before you start

To ensure a smooth experience, make sure you have completed the following setups:

Set up your Zilliz Cloud cluster

  • If you have not already, create a cluster.

  • Gather these details: Cluster Endpoint, API Key, Cluster ID, and Cloud Region.

Install dependencies

Currently, you can use data-import-related APIs in Python or Java.

To use the Python API, run the following command in your terminal to install pymilvus and minio or upgrade them to the latest version.

python3 -m pip install --upgrade pymilvus minio

Configure your remote storage bucket

  • Set up a remote bucket using AWS S3.

  • Note down

    • Access Key, Secret Key, and Bucket Name for S3-compatible block storage service.

    • AccountName, AccountKey, and ContainerName for Microsoft Azure blob storage service.

    These details are available in the console of the cloud provider where your bucket is hosted.

To enhance the usage of the example code, we recommend you use variables to store the configuration details:

# Configs for Zilliz Cloud cluster
CLUSTER_ENDPOINT=""
API_KEY=""
TOKEN=""
CLUSTER_ID="" # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
CLOUD_REGION=""
CLOUD_API_ENDPOINT="controller.api.{0}.zillizcloud.com".format(CLOUD_REGION)
COLLECTION_NAME=""

# Configs for remote bucket
ACCESS_KEY=""
SECRET_KEY=""
BUCKET_NAME=""

Download example dataset

Run the following command in your terminal to download the example CSV file.

curl https://assets.zilliz.com/doc-assets/medium_articles_partial_a13e0f2a.csv \
--output medium_articles_partial.csv

The output is similar to the following. Click here to download the example dataset..

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
Dload Upload Total Spent Left Speed
100 5133k 100 5133k 0 0 430k 0 0:00:11 0:00:11 --:--:-- 599k0

The following table lists the data structure of the data and the values in the first row.

Field Name

Type

Attributes

Sample Value

id

INT64

N/A

0

title_vector

FLOAT_VECTOR

Dimension: 768

[0.041732933, 0.013779674, -0.027564144, -0.01…

title

VARCHAR

Max length: 512

The Reported Mortality Rate of Coronavirus Is …

link

VARCHAR

Max length: 512

https://medium.com/swlh/the-reported-mortality…

reading_time

INT64

N/A

13

publication

VARCHAR

Max length: 512

The Startup

claps

INT64

N/A

1100

responses

INT64

N/A

18

The example dataset comprises details of over 5,000 articles from medium.com. Learn about this dataset from its introduction page on Kaggle.

Set up target collection

Based on the output above, we can work out a schema for our target collection.

In the following demo, we will include the first four fields in the pre-defined schema and use the other four as dynamic fields.

from pymilvus import MilvusClient, DataType

# You need to work out a collection schema out of your dataset.
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)

The parameters in the above code are described as follows:

  • fields:

    • id is the primary field.

    • title_vector is a vector field that holds 768-dimensional vector embeddings.

    • title and link are two scalar fields.

  • auto_id=False

    This is the default value. Setting this to True prevents BulkWriter from including the primary field in generated files.

  • enable_dynamic_field=True

    The value defaults to False. Setting this to True allows BulkWriter to include undefined fields and their values from the generated files as key-value pairs and place them in a reserved JSON field named $meta.

Once the schema is set, you can create the target collection as follows:

from pymilvus import MilvusClient

# 1. Set up a Milvus client
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=TOKEN
)

# 2. Set index parameters
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
field_name="title_vector",
index_type="AUTOINDEX",
metric_type="IP"
)

# 3. Create collection
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params
)

Prepare source data

BulkWriter can rewrite your dataset into JSON, Parquet, or NumPy files. We will create a RemoteBulkWriter and use the writer to rewrite your data into these formats.

Create RemoteBulkWriter

Once the schema is ready, you can use the schema to create a RemoteBulkWriter. A RemoteBulkWriter asks for permission to access a remote bucket. You should set up connection parameters to access the remote bucket in a ConnectParam object and reference it in the RemoteBulkWriter.


from pymilvus.bulk_import import RemoteBulkWriter, BulkFileType
# Use `from pymilvus import RemoteBulkWriter, BulkFileType`
# if your pymilvus version is earlier than 2.4.2

# Connections parameters to access the remote bucket
conn = RemoteBulkWriter.S3ConnectParam(
endpoint="storage.googleapis.com", # Use "s3.amazonaws.com" for AWS S3
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
bucket_name=BUCKET_NAME,
secure=True
)

📘Notes

The endpoint parameter refers to the storage service URI of your cloud provider.

For an S3-compatible storage service, possible URIs are as follows:

  • s3.amazonaws.com(AWS S3)

  • storage.googleapis.com (GCS)

For an Azure blob storage container, you should use a valid connection string similar to the following:

DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.windows.net

Then, you can reference the connection parameters in the RemoteBulkWriter as follows:

writer = RemoteBulkWriter(
schema=schema, # Target collection schema
remote_path="/", # Output directory relative to the remote bucket root
segment_size=512*1024*1024, # Maximum segment size when segmenting the raw data
connect_param=conn, # Connection parameters defined above
file_type=BulkFileType.JSON_RB # Type of the generated file.
)

# Possible file types:
# - BulkFileType.JSON_RB,
# - BulkFileType.NPY, and
# - BulkFileType.PARQUET

The above writer generates files in JSON format and uploads them to the root folder of the specified bucket.

  • remote_path="/"

    This determines the output path of the generated files in the remote bucket.

    Setting it to "/" makes the RemoteBulkWriter place the generated files in the root folder of the remote bucket. To use other paths, set it to a path relative to the remote bucket root.

  • file_type=BulkFileType.PARQUET

    This determines the type of generated files. Possible values are as follows:

    • BulkFileType.JSON_RB

    • BulkFileType.PARQUET

    • BulkFileType.NPY

  • segment_size=512*1024*1024

    This determines whether BulkWriter segments the generated files. The value defaults to 512 MB (512 * 1024 * 1024). If your dataset contains a great number of records, you are advised to segment your data by setting segment_size to a proper value.

Use the writer

A writer has two methods: one is for appending rows from the source dataset, and the other is for committing data to remote files.

You can append rows from the source dataset as follows:

import pandas as pd

df = pd.read_csv("path/to/medium_articles_partial.csv") # Use the actual file path to the dataset

for i in range(len(df)):
row = df.iloc[i].to_dict()
row["title_vector"] = [float(x) for x in row["title_vector"][1:-1].split(",")]
writer.append_row(row)

The append_row() method of the writer accepts a row dictionary.

A row dictionary should contain all schema-defined fields as keys. If dynamic fields are allowed, it can also include undefined fields. For details, refer to Use BulkWriter.

BulkWriter generates files only after you call its commit() method.

writer.commit()

Till now, BulkWriter has prepared the source data for you in the specified remote bucket.

To check the generated files, you can get the actual output path by printing the data_path property of the writer.

print(writer.data_path)

# /5868ba87-743e-4d9e-8fa6-e07b39229425
📘Notes

BulkWriter generates a UUID, creates a sub-folder using the UUID in the provided output directory, and places all generated files in the sub-folder.

For details, refer to Use BulkWriter.

Import prepared data

Before this step, ensure that the prepared data has already been uploaded to the desired bucket.

Start importing

To import the prepared source data, you need to call the bulk_import() function as follows:

from pymilvus import bulk_import

# Publicly accessible URL for the prepared data in the remote bucket
object_url = "gs://{0}/{1}/".format(BUCKET_NAME, str(writer.data_path)[1:])
# Change `gs` to `s3` for AWS S3

# Start bulk-import
res = bulk_import(
# Parameters for Zilliz Cloud access
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
collection_name=COLLECTION_NAME,
# Parameters for bucket access
object_url=object_url,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,

)

print(res.json())

# {'code': 200, 'data': {'jobId': '0f7fe853-d93e-4681-99f2-4719c63585cc'}}
📘Notes

The object_url should be a valid URL to a file or folder in the remote bucket. In the code provided, the format() method is used to combine the bucket name and the data path returned by the writer to create a valid object URL.

If the data and target collection are hosted by Google Cloud, the object URL should be similar to gs://remote-bucket/file-path. For applicable URI to prefix the data path returned by the writer, please refer to Prepare Source Data.

Check task progress

The following code checks the bulk-import progress every 5 seconds and outputs the progress in percentage.

import time
from pymilvus import get_import_progress

job_id = res.json()['data']['jobId']

res = get_import_progress(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
)

print(res.json()["data"]["readyPercentage"])

# check the bulk-import progress
while res.json()["data"]["readyPercentage"] < 1:
time.sleep(5)

res = get_import_progress(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)

print(res.json()["data"]["readyPercentage"])

# 0.01 -- import progress 1%
# 0.5 -- import progress 50%
# 0.5
# 1 -- finished
📘Notes

Replace url in the getimportprogress() with the one corresponding to the cloud region of the target collection.

You can list all bulk-import jobs as follows:

from pymilvus import list_import_jobs

res = list_import_jobs(
url=CLOUD_API_ENDPOINT,
api_key=API_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud cluster ID, like "in01-xxxxxxxxxxxxxxx"
page_size=10,
current_page=1,
)

print(res.json())

# {
# "code":200,
# "data":{
# "tasks":[
# {
# "collectionName":"medium_aritlces",
# "jobId":"0f7fe853-d93e-4681-99f2-4719c63585cc",
# "state":"ImportCompleted"
# }
# ],
# "count":1,
# "currentPage":1,
# "pageSize":10
# }
# }

Recaps

In this course, we have covered the entire process of importing data, and here are some ideas to recap:

  • Examine your data to work out the schema of the target collection.

  • When using BulkWriter, note the following:

    • Include all schema-defined fields as keys in each row to append. If dynamic fields are allowed, include also applicable undefined fields.

    • Do not forget to call commit() after appending all rows.

  • When using bulk_import(), build the object URL by concatenating the endpoint of the cloud provider hosting the prepared data and the data path returned by the writer.