Skip to main content

LocalBulkWriter

A LocalBulkWriter instance rewrites your raw data locally in a format that Milvus understands.

LocalBulkWriter(LocalBulkWriterParam bulkWriterParam)

Methods of LocalBulkWriter:

Method

Description

Parameters

appendRow(JsonObject rowData)

Append a row into buffer. Once the buffer size exceeds a threshold, the writer will persist the buffer to data file.

rowData: A gson.JsonObject to store the data of a row.
For each field:
- If dataType is Bool/Int8/Int16/Int32/Int64/Float/Double/Varchar, use JsonObject.addProperty(key, value) to input;
- If dataType is FloatVector, use JsonObject.add(key, gson.toJsonTree(List[Float]) to input;
- If dataType is BinaryVector/Float16Vector/BFloat16Vector, use JsonObject.add(key, gson.toJsonTree(byte[])) to input;
- If dataType is SparseFloatVector, use JsonObject.add(key, gson.toJsonTree(SortedMap[Long, Float])) to input;
- If dataType is Array, use JsonObject.add(key, gson.toJsonTree(List of Boolean/Integer/Short/Long/Float/Double/String)) to input;
- If dataType is JSON, use JsonObject.add(key, JsonElement) to input;

commit(boolean async)

Force persist data files and complete the writer.

async: Set to true to wait until all data files are persisted.

getBatchFiles()

Returns a List<List<String>gt; of the persisted data files. Each List<String> is a batch files that can be input as a job for the bulkinsert interface.

N/A

LocalBulkWriterParam

Use the LocalBulkWriterParam.Builder to construct a LocalBulkWriterParam object.

import io.milvus.bulkwriter.LocalBulkWriterParam;
LocalBulkWriterParam.Builder builder = LocalBulkWriterParam.newBuilder();

Methods of LocalBulkWriterParam.Builder:

Method

Description

Parameters

withCollectionSchema(CollectionSchemaParam collectionSchema)

Sets the collection schema. See the CollectionSchemaParam description in the Collection.createCollection() section.

collectionSchema: collection schema

withLocalPath(tring localPath)

Sets the local path to output the data files.

localPath: A local path.

withChunkSize(int chunkSize)

Sets the maximum size of a data chunk.
While rewriting your raw data, This tool splits your raw data into chunks.
The value defaults to 128 MB.

chunkSize: the maximum size of a data chunk.

withFileType(BulkFileType fileType)

The type of the output file. Currently, only PARQUET is available.

fileType: The output file type.

build()

Constructs a LocalBulkWriterParam object

N/A

Example

import io.milvus.bulkwriter.*;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;

CollectionSchemaParam collectionSchema = CollectionSchemaParam.newBuilder()
.addFieldType(FieldType.newBuilder()
.withName("id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build())
.addFieldType(FieldType.newBuilder()
.withName("vector")
.withDataType(DataType.FloatVector)
.withDimension(DIM)
.build())
.build();

LocalBulkWriterParam bulkWriterParam = LocalBulkWriterParam.newBuilder()
.withCollectionSchema(collectionSchema)
.withLocalPath("/tmp/bulk_writer")
.withFileType(fileType)
.withChunkSize(512 * 1024 * 1024)
.build();

try (LocalBulkWriter localBulkWriter = new LocalBulkWriter(bulkWriterParam)) {
Gson gson = new Gson();
for (int i = 0; i < 100000; i++) {
JsonObject row = new JsonObject();
row.addProperty("id", i);
row.add("vector", gson.toJsonTree(GeneratorUtils.genFloatVector(DIM)));

localBulkWriter.appendRow(row);
}

localBulkWriter.commit(false);
List<List<String>> batchFiles = localBulkWriter.getBatchFiles();
System.out.printf("Local writer done! output local files: %s%n", batchFiles);
} catch (Exception e) {
System.out.println("Local writer catch exception: " + e);
throw e;
}