Import Data (SDK)
This guide helps you learn how to use our SDKs to import data into a collection with the bulk-writer and bulk-import APIs.
Alternatively, you can also refer to our fast-track end-to-end course which covers both data preparations and data import to Zilliz Cloud collections.
Install dependencies
- Python
- Java
Run the following command in your terminal to install pymilvus and minio or upgrade them to the latest version.
python3 -m pip install --upgrade pymilvus minio
- For Apache Maven, append the following to the pom.xml dependencies:
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.9</version>
</dependency>
- For Gradle/Grails, run the following
compile 'io.milvus:milvus-sdk-java:2.4.0'
compile 'io.minio:minio:8.5.9'
Check prepared data
Once you have prepared your data using the BulkWriter tool and got the path to the prepared files. You are ready to import them to a Zilliz Cloud collection. To check whether they are ready, do as follows:
- Python
- Java
from minio import Minio
# Third-party constants
ACCESS_KEY = "YOUR_ACCESS_KEY"
SECRET_KEY = "YOUR_SECRET_KEY"
BUCKET_NAME = "YOUR_BUCKET_NAME"
REMOTE_PATH = "YOUR_REMOTE_PATH"
client = Minio(
endpoint="storage.googleapis.com", # use 's3.amazonaws.com' for AWS S3
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True
)
objects = client.list_objects(
bucket_name=BUCKET_NAME,
prefix=REMOTE_PATH,
recursive=True
)
print([obj.object_name for obj in objects])
# Output
#
# [
# "folder/1/claps.npy",
# "folder/1/id.npy",
# "folder/1/link.npy",
# "folder/1/publication.npy",
# "folder/1/reading_time.npy",
# "folder/1/responses.npy",
# "folder/1/title.npy",
# "folder/1/vector.npy"
# ]
import io.minio.MinioClient;
import io.minio.Result;
import io.minio.messages.Item;
import java.util.Iterator;
// Third-party constants
String ACCESS_KEY = "YOUR_ACCESS_KEY";
String SECRET_KEY = "YOUR_SECRET_KEY";
String BUCKET_NAME = "YOUR_BUCKET_NAME";
String REMOTE_PATH = "YOUR_REMOTE_PATH";
MinioClient minioClient = MinioClient.builder()
.endpoint("storage.googleapis.com") // use 's3.amazonaws.com' for AWS S3
.credentials(ACCESS_KEY, SECRET_KEY)
.build();
Iterable<Result<Item>> results = minioClient.listObjects(
ListObjectsArgs.builder().bucket(BUCKET_NAME).prefix(REMOTE_PATH).build();
);
while (results.hasNext()) {
Result<Item> result = results.next();
System.out.println(result.get().objectName());
}
// Output
// [[1.parquet]]
Create collection
Once your data files are ready, connect to a Zilliz Cloud cluster, create a collection out of the schema, and import the data from the files in the storage bucket.
- Python
- Java
from pymilvus import MilvusClient, DataType
# set up your collection
## Zilliz Cloud constants
CLUSTER_ENDPOINT = "YOUR_CLUSTER_ENDPOINT"
TOKEN = "YOUR_CLUSTER_TOKEN"
API_KEY = "YOUR_CLUSTER_TOKEN"
CLUSTER_ID = "YOUR_CLUSTER_ID" # inxx-xxxxxxxxxxxxxxx
CLUSTER_REGION = "YOUR_CLUSTER_REGION"
COLLECTION_NAME = "medium_articles"
## Third-party constants
OBJECT_URL = "YOUR_OBJECT_URL"
# create a milvus client
client = MilvusClient(
uri=CLUSTER_ENDPOINT,
token=TOKEN
)
# prepare schema
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_schema=False
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(field_name="publication", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)
schema.verify()
# prepare index parameters
index_params = MilvusClient.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="L2"
)
client.create_collection(
collection_name="customized_setup",
schema=schema,
index_params=index_params
)
import io.milvus.param.collection.CollectionSchemaParam;
import io.milvus.param.collection.FieldType;
import io.milvus.grpc.DataType;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;
import io.milvus.param.collection.CreateCollectionParam;
import io.milvus.param.collection.LoadCollectionParam;
import io.milvus.param.index.CreateIndexParam;
// Configs for Zilliz Cloud cluster
String CLUSTER_ENDPOINT = "";
String TOKEN = "";
String API_KEY = "";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String CLOUD_REGION = "";
String COLLECTION_NAME = "";
// Third-party constants
String OBJECT_URL = ""
// Define schema for the target collection
FieldType id = FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType titleVector = FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(768)
.build();
FieldType title = FieldType.newBuilder()
.withName("title")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType link = FieldType.newBuilder()
.withName("link")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType readingTime = FieldType.newBuilder()
.withName("reading_time")
.withDataType(DataType.Int64)
.build();
FieldType publication = FieldType.newBuilder()
.withName("publication")
.withDataType(DataType.VarChar)
.withMaxLength(512)
.build();
FieldType claps = FieldType.newBuilder()
.withName("claps")
.withDataType(DataType.Int64)
.build();
FieldType responses = FieldType.newBuilder()
.withName("responses")
.withDataType(DataType.Int64)
.build();
CollectionSchemaParam schema = CollectionSchemaParam.newBuilder()
.withEnableDynamicField(false)
.addFieldType(id)
.addFieldType(titleVector)
.addFieldType(title)
.addFieldType(link)
.addFieldType(readingTime)
.addFieldType(publication)
.addFieldType(claps)
.addFieldType(responses)
.build();
// Create a collection with the given schema
ConnectParam connectParam = ConnectParam.newBuilder()
.withUri(CLUSTER_ENDPOINT)
.withToken(TOKEN)
.build();
MilvusServiceClient milvusClient = new MilvusServiceClient(connectParam);
CreateCollectionParam collectionParam = CreateCollectionParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withSchema(schema)
.build();
milvusClient.createCollection(collectionParam);
CreateIndexParam indexParam = CreateIndexParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFieldName("title_vector")
.withIndexType(IndexType.AUTOINDEX)
.withMetricType(MetricType.L2)
.build();
milvusClient.createIndex(indexParam);
LoadCollectionParam loadCollectionParam = LoadCollectionParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.build();
milvusClient.loadCollection(loadCollectionParam);
In the code snippet above, fill in CLUSTER_ENDPOINT
, TOKEN
, API_KEY
, CLUSTER_ID
and CLOUD_REGION
with your data, respectively.
You can obtain CLOUD_REGION
and CLUSTER_ID
from your cluster's public endpoint. For instance, in the public endpoint https://in03-3bf3c31f4248e22.api.aws-us-east1.zillizcloud.com
, CLOUD_REGION_ID
is aws-us-east1
and CLUSTER_ID
is in03-3bf3c31f4248e22
.
Import data
Once your data and collection are ready, you can start the import process as follows:
- Python
- Java
from pymilvus import bulk_import
# Use `from pymilvus.bulk
# Bulk-import your data from the prepared data files
res = bulk_import(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
object_url=OBJECT_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
collection_name=COLLECTION_NAME
)
print(res.json())
# Output
#
# {
# "code": 200,
# "data": {
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a"
# }
# }
import io.milvus.bulkwriter.response.BulkImportResponse;
BulkImportResponse bulkImportResponse = CloudImport.bulkImport(
CLUSTER_ENDPOINT,
API_KEY,
CLUSTER_ID, // Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
COLLECTION_NAME,
OBJECT_URL,
ACCESS_KEY,
SECRET_KEY
);
// Get import job ID
String jobId = bulkImportResponse.getJobId();
System.out.println(jobId);
// 0f7fe853-d93e-4681-99f2-4719c63585cc
For successful data import, ensure the target collection has less than 10 running or pending import jobs.
Check import progress
You can check the progress of a specified bulk-import job.
- Python
- Java
from pymilvus import get_import_progress
job_id = res.json()['data']['jobId']
res = get_import_progress(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID # Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
)
# check the bulk-import progress
while res.json()["data"]["readyPercentage"] < 1:
time.sleep(5)
res = get_import_progress(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
job_id=job_id,
cluster_id=CLUSTER_ID
)
print(res.json())
# Output
#
# {
# "code": 200,
# "data": {
# "collectionName": "medium_articles",
# "fileName": "folder/1/",
# "fileSize": 26571700,
# "readyPercentage": 1,
# "completeTime": "2023-10-28T06:51:49Z",
# "errorMessage": null,
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a",
# "details": [
# {
# "fileName": "folder/1/",
# "fileSize": 26571700,
# "readyPercentage": 1,
# "completeTime": "2023-10-28T06:51:49Z",
# "errorMessage": null
# }
# ]
# }
# }
while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
TimeUnit.SECONDS.sleep(5);
GetImportProgressResponse getImportProgressResponse = CloudImport.getImportProgress(
CLUSTER_ENDPOINT,
API_KEY,
CLUSTER_ID, // Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
jobId
);
if (getImportProgressResponse.getReadyPercentage().intValue() == 1) {
System.err.printf("The job %s completed%n", jobId);
break;
} else if (StringUtils.isNotEmpty(getImportProgressResponse.getErrorMessage())) {
System.err.printf("The job %s failed, reason: %s%n", jobId, getImportProgressResponse.getErrorMessage());
break;
} else {
System.err.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getReadyPercentage());
}
}
// The job 0f7fe853-d93e-4681-99f2-4719c63585cc is running, progress: 0.01
// The job 0f7fe853-d93e-4681-99f2-4719c63585cc is running, progress: 0.5
// The job 0f7fe853-d93e-4681-99f2-4719c63585cc completed.
List all import jobs
If you also want to know about all bulk-import tasks, you can call the list-import-jobs API as follows:
- Python
- Java
from pymilvus import list_import_jobs
# list bulk-import jobs
res = list_import_jobs(
url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
api_key=API_KEY,
cluster_id=CLUSTER_ID, # Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
page_size=10,
current_page=1,
)
print(res.json())
# Output
#
# {
# "code": 200,
# "data": {
# "tasks": [
# {
# "collectionName": "medium_articles",
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a",
# "state": "ImportCompleted"
# },
# {
# "collectionName": "medium_articles",
# "jobId": "53632e6c-c078-4476-b840-10c4793d9c08",
# "state": "ImportCompleted"
# },
# ],
# "count": 2,
# "currentPage": 1,
# "pageSize": 10
# }
# }
ListImportJobsResponse listImportJobsResponse = CloudImport.listImportJobs(
CLUSTER_ENDPOINT,
API_KEY,
CLUSTER_ID, // Zilliz Cloud Cluster ID, as `in01-xxxxxxxxxxxxxxx`
10,
1
);
System.out.println(listImportJobsResponse);