Skip to main content

RemoteBulkWriter

A RemoteBulkWriter instance writes your raw data in a format that Milvus understands into an AWS-S3-compatible bucket.

RemoteBulkWriter(RemoteBulkWriterParam bulkWriterParam)

Methods of RemoteBulkWriter:

Method

Description

Parameters

appendRow(JsonObject rowData)

Append a row into buffer. Once the buffer size exceeds a threshold, the writer will persist the buffer to data file.

rowData: A gson.JsonObject to store the data of a row.
For each field:
- If dataType is Bool/Int8/Int16/Int32/Int64/Float/Double/Varchar, use JsonObject.addProperty(key, value) to input;
- If dataType is FloatVector, use JsonObject.add(key, gson.toJsonTree(List[Float]) to input;
- If dataType is BinaryVector/Float16Vector/BFloat16Vector, use JsonObject.add(key, gson.toJsonTree(byte[])) to input;
- If dataType is SparseFloatVector, use JsonObject.add(key, gson.toJsonTree(SortedMap[Long, Float])) to input;
- If dataType is Array, use JsonObject.add(key, gson.toJsonTree(List of Boolean/Integer/Short/Long/Float/Double/String)) to input;
- If dataType is JSON, use JsonObject.add(key, JsonElement) to input;

commit(boolean async)

Force persist data files and complete the writer.

async: Set to true to wait until all data files are persisted.

getBatchFiles()

Returns a List<List<String>gt; of the persisted data files. Each List<String> is a batch files that can be input as a job for the bulkinsert interface.

N/A

RemoteBulkWriterParam

Use the RemoteBulkWriterParam.Builder to construct a RemoteBulkWriterParam object.

import io.milvus.bulkwriter.RemoteBulkWriterParam;
RemoteBulkWriterParam.Builder builder = RemoteBulkWriterParam.newBuilder();

Methods of RemoteBulkWriterParam.Builder:

Method

Description

Parameters

withCollectionSchema(CollectionSchemaParam collectionSchema)

Sets the collection schema. See the CollectionSchemaParam description in the Collection.createCollection() section.

collectionSchema: collection schema

withConnectParam(StorageConnectParam connectParam)

Sets the connect parameters for different storage remote services. Currently, two options are available: S3ConnectParam and AzureConnectParam.

connectParam: Connect parameters for remote storage service.

withRemotePath(String remotePath)

Sets the path on the remote storage service where to upload the data files.

remotePath: A path on the remote storage service.

withChunkSize(int chunkSize)

Sets the maximum size of a data chunk.
While rewriting your raw data, This tool splits your raw data into chunks.
The value defaults to 128 MB.

chunkSize: the maximum size of a data chunk.

withFileType(BulkFileType fileType)

The type of the output file. Currently, only PARQUET is available.

fileType: The output file type.

build()

Constructs a LocalBulkWriterParam object

N/A

AzureConnectParam

Use the AzureConnectParam.Builder to construct an AzureConnectParam object.

import io.milvus.bulkwriter.connect.AzureConnectParam;
AzureConnectParam.Builder builder = AzureConnectParam.newBuilder();

Methods of AzureConnectParam.Builder:

Method

Description

Parameters

withContainerName(String containerName)

Sets the Azure container name.

containerName: The target container name.

withConnStr(String connStr)

Sets the connect string.

connStr: A connection string to an Azure Storage account, which can be parsed to an account_url and a credential.To generate a connection string, read this link: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string

withAccountUrl(String accountUrl)

Sets the account url.

accountUrl: A string in format like https://<storage-account>.blob.core.windows.netRead this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview

withCredential(TokenCredential credential)

Set the credential.

credential: Account access key for the account, read this link for more info:https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys

build()

Constructs a AzureConnectParam object

N/A

S3ConnectParam

Use the S3ConnectParam.Builder to construct an S3ConnectParam object.

import io.milvus.bulkwriter.connect.S3ConnectParam;
S3ConnectParam.Builder builder = S3ConnectParam.newBuilder();

Methods of S3ConnectParam.Builder:

Method

Description

Parameters

withCloudName(String cloudName)

Sets the cloud name of the S3.

cloudName: The cloud name.

withBucketName(String bucketName)

Sets the bucket name.

bucketName: The bucket name.

withEndpoint(String endpoint)

Sets the endpoint.

endpoint: The endpoint.

withAccessKey(String accessKey)

Set the access key.

accessKey: The access key.

withSecretKey(String secretKey)

Set the secret key.

secretKey: The secret key.

withSessionToken(String sessionToken)

Set the session token.

sessionToken: The session token.

withRegion(String region)

Set the region name.

region: The region name.

withHttpClient(OkHttpClient httpClient)

Set the http client in necessary.

httpClient: http client.

build()

Constructs a S3ConnectParam object

N/A

Example

import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;

CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
.addFieldType(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build())
.addFieldType(FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(DIM)
.build())
.build();

StorageConnectParam connectParam = S3ConnectParam.newBuilder()
.withEndpoint(STORAGE_ENDPOINT)
.withCloudName(CLOUD_NAME)
.withBucketName(STORAGE_BUCKET)
.withAccessKey(STORAGE_ACCESS_KEY)
.withSecretKey(STORAGE_SECRET_KEY)
.withRegion(STORAGE_REGION)
.build();

RemoteBulkWriterParam bulkWriterParam = RemoteBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withRemotePath("bulk_data")
.withFileType(BulkFileType.PARQUET)
.withChunkSize(512 * 1024 * 1024)
.withConnectParam(connectParam)
.build();

try (RemoteBulkWriter remoteBulkWriter = RemoteBulkWriter(bulkWriterParam)) {
Gson gson = new Gson();
for (int i = 0; i < 10000; ++i) {
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));

remoteBulkWriter.appendRow(row);
}
System.out.printf("%s rows appends%n", remoteBulkWriter.getTotalRowCount());
System.out.printf("%s rows in buffer not flushed%n", remoteBulkWriter.getBufferRowCount());
System.out.println("Generate data files...");
remoteBulkWriter.commit(false);

List<List<String>> batchFiles = remoteBulkWriter.getBatchFiles();
System.out.printf("Data files have been uploaded: %s%n", batchFiles);

for (List<String> files : batchFiles) {
R<ImportResponse> response = milvusClient.bulkInsert(BulkInsertParam.newBuilder()
.withCollectionName(COLLECTION_NAME)
.withFiles(files)
.build());
}
} catch (Exception e) {
System.out.println("allTypesRemoteWriter catch exception: " + e);
throw e;
}