Skip to main content
Version: User Guides (BYOC)

Import Data (SDK)

This guide helps you learn how to use our SDKs to import data into a collection with the bulk-writer and bulk-import APIs.

Alternatively, you can also refer to our fast-track end-to-end course which covers both data preparations and data import to Zilliz Cloud collections.

Install dependencies

Run the following command in your terminal to install pymilvus and minio or upgrade them to the latest version.

python3 -m pip install --upgrade pymilvus minio

Check prepared data

Once you have prepared your data using the BulkWriter tool and got the path to the prepared files. You are ready to import them to a Zilliz Cloud collection. To check whether they are ready, do as follows:

from minio import Minio

# Third-party constants
ACCESS_KEY = "YOUR_ACCESS_KEY"
SECRET_KEY = "YOUR_SECRET_KEY"
BUCKET_NAME = "YOUR_BUCKET_NAME"
REMOTE_PATH = "YOUR_REMOTE_PATH"

client = Minio(
endpoint="storage.googleapis.com", # use 's3.amazonaws.com' for AWS S3
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True
)

objects = client.list_objects(
bucket_name=BUCKET_NAME,
prefix=REMOTE_PATH,
recursive=True
)

print([obj.object_name for obj in objects])

# Output
#
# [
# "folder/1/claps.npy",
# "folder/1/id.npy",
# "folder/1/link.npy",
# "folder/1/publication.npy",
# "folder/1/reading_time.npy",
# "folder/1/responses.npy",
# "folder/1/title.npy",
# "folder/1/vector.npy"
# ]

Create collection

Once your data files are ready, connect to a Zilliz Cloud cluster, create a collection out of the schema, and import the data from the files in the storage bucket.

from pymilvus import MilvusClient, DataType

# set up your collection

## Zilliz Cloud constants
CLUSTER_ENDPOINT = "YOUR_CLUSTER_ENDPOINT"
TOKEN = "YOUR_CLUSTER_TOKEN"
API_KEY = "YOUR_CLUSTER_TOKEN"
CLUSTER_ID = "YOUR_CLUSTER_ID" # inxx-xxxxxxxxxxxxxxx
CLUSTER_REGION = "YOUR_CLUSTER_REGION"
COLLECTION_NAME = "medium_articles"

## Third-party constants
OBJECT_URL = "YOUR_OBJECT_URL"

# create a milvus client
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=TOKEN
)

# prepare schema
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_schema=False
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(field_name="publication", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)

schema.verify()

# prepare index parameters
index_params = MilvusClient.prepare_index_params()

index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="L2"
)

client.create_collection(
collection_name="customized_setup",
schema=schema,
index_params=index_params
)

In the code snippet above, fill in CLUSTER_ENDPOINT, TOKEN, API_KEY, CLUSTER_ID and CLOUD_REGION with your data, respectively.

You can obtain CLOUD_REGION and CLUSTER_ID from your cluster's public endpoint. For instance, in the public endpoint https://in03-3bf3c31f4248e22.api.aws-us-east1.zillizcloud.com, CLOUD_REGION_ID is aws-us-east1 and CLUSTER_ID is in03-3bf3c31f4248e22.

Import data

Once your data and collection are ready, you can start the import process as follows:

from pymilvus import bulk_import
# Use `from pymilvus.bulk

# Bulk-import your data from the prepared data files

res = bulk_import(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
object_url=OBJECT_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
collection_name=COLLECTION_NAME
)

print(res.json())

# Output
#
# {
# "code": 200,
# "data": {
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a"
# }
# }
📘Notes

For successful data import, ensure the target collection has less than 10 running or pending import jobs.

Check import progress

You can check the progress of a specified bulk-import job.

from pymilvus import get_import_progress

job_id = res.json()['data']['jobId']
res = get_import_progress(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID # Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
)

# check the bulk-import progress

while res.json()["data"]["readyPercentage"] < 1:
time.sleep(5)

res = get_import_progress(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)

print(res.json())

# Output
#
# {
# "code": 200,
# "data": {
# "collectionName": "medium_articles",
# "fileName": "folder/1/",
# "fileSize": 26571700,
# "readyPercentage": 1,
# "completeTime": "2023-10-28T06:51:49Z",
# "errorMessage": null,
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a",
# "details": [
# {
# "fileName": "folder/1/",
# "fileSize": 26571700,
# "readyPercentage": 1,
# "completeTime": "2023-10-28T06:51:49Z",
# "errorMessage": null
# }
# ]
# }
# }

List all import jobs

If you also want to know about all bulk-import tasks, you can call the list-import-jobs API as follows:

from pymilvus import list_import_jobs

# list bulk-import jobs

res = list_import_jobs(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
page_size=10,
current_page=1,
)

print(res.json())

# Output
#
# {
# "code": 200,
# "data": {
# "tasks": [
# {
# "collectionName": "medium_articles",
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a",
# "state": "ImportCompleted"
# },
# {
# "collectionName": "medium_articles",
# "jobId": "53632e6c-c078-4476-b840-10c4793d9c08",
# "state": "ImportCompleted"
# },
# ],
# "count": 2,
# "currentPage": 1,
# "pageSize": 10
# }
# }