Import Data (SDK)
This guide helps you learn how to use our SDKs to import data into a collection with the bulk-writer and bulk-import APIs.
Alternatively, you can also refer to our fast-track end-to-end course which covers both data preparations and data import to Zilliz Cloud collections.
Install dependencies
- Python
- Java
Run the following command in your terminal to install pymilvus and minio or upgrade them to the latest version.
python3 -m pip install --upgrade pymilvus minio
- For Apache Maven, append the following to the pom.xml dependencies:
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.9</version>
</dependency>
- For Gradle/Grails, run the following
compile 'io.milvus:milvus-sdk-java:2.4.8'
compile 'io.minio:minio:8.5.9'
Check prepared data
Once you have prepared your data using the BulkWriter tool and got the path to the prepared files. You are ready to import them to a Zilliz Cloud collection. To check whether they are ready, do as follows:
- Python
- Java
from minio import Minio
# Third-party constants
ACCESS_KEY = "YOUR_ACCESS_KEY"
SECRET_KEY = "YOUR_SECRET_KEY"
BUCKET_NAME = "YOUR_BUCKET_NAME"
REMOTE_PATH = "YOUR_REMOTE_PATH"
client = Minio(
endpoint="storage.googleapis.com", # use 's3.amazonaws.com' for AWS S3
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True
)
objects = client.list_objects(
bucket_name=BUCKET_NAME,
prefix=REMOTE_PATH,
recursive=True
)
print([obj.object_name for obj in objects])
# Output
#
# [
# "folder/1/claps.npy",
# "folder/1/id.npy",
# "folder/1/link.npy",
# "folder/1/publication.npy",
# "folder/1/reading_time.npy",
# "folder/1/responses.npy",
# "folder/1/title.npy",
# "folder/1/vector.npy"
# ]
import io.minio.MinioClient;
import io.minio.Result;
import io.minio.messages.Item;
import java.util.Iterator;
// Third-party constants
String ACCESS_KEY = "YOUR_ACCESS_KEY";
String SECRET_KEY = "YOUR_SECRET_KEY";
String BUCKET_NAME = "YOUR_BUCKET_NAME";
String REMOTE_PATH = "YOUR_REMOTE_PATH";
MinioClient minioClient = MinioClient.builder()
.endpoint("storage.googleapis.com") // use 's3.amazonaws.com' for AWS S3
.credentials(ACCESS_KEY, SECRET_KEY)
.build();
Iterable<Result<Item>> results = minioClient.listObjects(
ListObjectsArgs.builder().bucket(BUCKET_NAME).prefix(REMOTE_PATH).build();
);
while (results.hasNext()) {
Result<Item> result = results.next();
System.out.println(result.get().objectName());
}
// Output
// [[1.parquet]]
Import data
Once your data and collection are ready, you can import your data into a specific collection either via a stage or via an external storage, such as an object storage bucket and a block storage blob container.
Import data via stagePrivate Preview
To import data via stage, you need to create a storage and upload your data into the stage beforehand. For details, refer to Merge Data.
Once the stage is ready and the source data file is in place, you can import data from a stage as follows:
- Python
- Java
from pymilvus.bulk_writer import bulk_import
def cloud_bulkinsert():
# The value of the URL is fixed.
# For overseas regions, it is: https://api.cloud.zilliz.com
# For regions in China, it is: https://api.cloud.zilliz.com.cn
url = "https://api.cloud.zilliz.com"
api_key = ""
cluster_id = "inxx-xxxxxxxxxxxxxxx"
stage_name = "my-first-stage"
data_path = "dataPath"
print(f"\n===================== import files to cloud vectordb ====================")
resp = bulk_import(
url=url,
api_key=api_key,
cluster_id=cluster_id,
collection_name='quick_setup',
stage_name=stage_name,
data_paths=[[data_path]]
)
print(resp.json())
if __name__ == '__main__':
# # to call cloud bulkinsert api, you need to apply a cloud service from Zilliz Cloud(https://zilliz.com/cloud)
cloud_bulkinsert()
private static String bulkImport() throws InterruptedException {
/**
* The value of the URL is fixed.
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
String STAGE_NAME = "my-first-stage";
List<String> DATA_PATH = Lists.newArrayList("dataPath");
StageImportRequest stageImportRequest = StageImportRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID).collectionName("quick_setup")
.stageName(STAGE_NAME).dataPaths(Lists.newArrayList(Collections.singleton(DATA_PATH)))
.build();
String bulkImportResult = BulkImportUtils.bulkImport(CLOUD_API_ENDPOINT, stageImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a bulkInert task, job id: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
String jobId = bulkImport();
}
// 0f7fe853-d93e-4681-99f2-4719c63585cc
Import data via external storage
If you prefer to import data via external storage, do as follows:
- Python
- Java
from pymilvus.bulk_writer import bulk_import
# Bulk-import your data from the prepared data files
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
STORAGE_URL = ""
ACCESS_KEY = ""
SECRET_KEY = ""
res = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name="quick_setup",
object_url=STORAGE_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)
print(res.json())
# Output
#
# {
# "code": 0,
# "data": {
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a"
# }
# }
private static String bulkImport() throws InterruptedException {
/**
* The value of the URL is fixed.
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
String STORAGE_URL = "";
String ACCESS_KEY = "";
String SECRET_KEY = "";
CloudImportRequest cloudImportRequest = CloudImportRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID)
.collectionName("quick_setup")
.objectUrl(STORAGE_URL)
.accessKey(ACCESS_KEY)
.secretKey(SECRET_KEY)
.build();
String bulkImportResult = BulkImport.bulkImport(CLOUD_API_ENDPOINT, cloudImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a bulkInert task, job id: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
String jobId = bulkImport();
}
// 0f7fe853-d93e-4681-99f2-4719c63585cc
For successful data import, ensure the target collection has less than 10,000 running or pending import jobs.
Check import progress
You can check the progress of a specified bulk-import job.
- Python
- Java
import json
from pymilvus.bulk_writer import get_import_progress
## Zilliz Cloud constants
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
# Get bulk-insert job progress
resp = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
job_id="job-01fa0e5d42cjxudhpuehyp",
)
print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
/**
* The value of the URL is fixed.
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
CloudDescribeImportRequest request = CloudDescribeImportRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID)
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress(CLOUD_API_ENDPOINT, request);
System.out.println("Get import progress, result: " + getImportProgressResult);
}
public static void main(String[] args) throws Exception {
getImportProgress("job-xxxx");
}
List all import jobs
If you also want to know about all bulk-import tasks, you can call the list-import-jobs API as follows:
- Python
- Java
import json
from pymilvus.bulk_writer import list_import_jobs
## Zilliz Cloud constants
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
# List bulk-insert jobs
resp = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID
)
print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
/**
* The value of the URL is fixed.
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID).build();
String listImportJobsResult = BulkImport.listImportJobs(CLOUD_API_ENDPOINT, listImportJobsRequest);
System.out.println(listImportJobsResult);
}
public static void main(String[] args) throws Exception {
listImportJobs();
}