データのインポート(SDK)
このガイドでは、SDKを使用してバルクライターおよびバルクインポートAPIでデータをコレクションにインポートする方法を学習します。
または、私たちの短期集中型エンドツーエンドコースも参照できます。このコースでは、データの準備とZilliz Cloudコレクションへのデータインポートの両方をカバーしています。
依存関係をインストール
- Python
- Java
ターミナルで以下のコマンドを実行して、pymilvusおよびminioをインストールするか、最新バージョンにアップグレードしてください。
python3 -m pip install --upgrade pymilvus minio
- Apache Mavenの場合は、pom.xmlの依存関係に以下を追加:
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.9</version>
</dependency>
- Gradle/Grailsの場合は、以下を実行:
compile 'io.milvus:milvus-sdk-java:2.4.8'
compile 'io.minio:minio:8.5.9'
準備されたデータを確認
BulkWriterツールを使用してデータを準備し、準備されたファイルのパスを取得したとします。Zilliz Cloudコレクションにインポートする準備ができています。準備ができているかどうかを確認するには、以下のようにします:
- Python
- Java
from minio import Minio
# サードパーティの定数
ACCESS_KEY = "YOUR_ACCESS_KEY"
SECRET_KEY = "YOUR_SECRET_KEY"
BUCKET_NAME = "YOUR_BUCKET_NAME"
REMOTE_PATH = "YOUR_REMOTE_PATH"
client = Minio(
endpoint="storage.googleapis.com", # AWS S3の場合は 's3.amazonaws.com' を使用
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True
)
objects = client.list_objects(
bucket_name=BUCKET_NAME,
prefix=REMOTE_PATH,
recursive=True
)
print([obj.object_name for obj in objects])
# 出力
#
# [
# "folder/1/claps.npy",
# "folder/1/id.npy",
# "folder/1/link.npy",
# "folder/1/publication.npy",
# "folder/1/reading_time.npy",
# "folder/1/responses.npy",
# "folder/1/title.npy",
# "folder/1/vector.npy"
# ]
import io.minio.MinioClient;
import io.minio.Result;
import io.minio.messages.Item;
import java.util.Iterator;
// サードパーティの定数
String ACCESS_KEY = "YOUR_ACCESS_KEY";
String SECRET_KEY = "YOUR_SECRET_KEY";
String BUCKET_NAME = "YOUR_BUCKET_NAME";
String REMOTE_PATH = "YOUR_REMOTE_PATH";
MinioClient minioClient = MinioClient.builder()
.endpoint("storage.googleapis.com") // AWS S3の場合は 's3.amazonaws.com' を使用
.credentials(ACCESS_KEY, SECRET_KEY)
.build();
Iterable<Result<Item>> results = minioClient.listObjects(
ListObjectsArgs.builder().bucket(BUCKET_NAME).prefix(REMOTE_PATH).build();
);
while (results.hasNext()) {
Result<Item> result = results.next();
System.out.println(result.get().objectName());
}
// 出力
// [[1.parquet]]
データのインポート
データとコレクションの準備ができたら、ステージ経由またはオブジェクトストレージバケットやブロックストレージblobコンテナなどの外部ストレージ経由で、特定のコレクションにデータをインポートできます。
ステージを介したデータのインポートPrivate Preview
ステージ経由でデータをインポートするには、事前にストレージを作成し、データをステージにアップロードする必要があります。詳細については、データのマージを参照してください。
ステージの準備ができ、ソースデータファイルが配置されたら、以下のようにステージからデータをインポートできます:
- Python
- Java
from pymilvus.bulk_writer import bulk_import
def cloud_bulkinsert():
# URLの値は固定です。
# 海外リージョンの場合は:https://api.cloud.zilliz.com
# 中国リージョンの場合は:https://api.cloud.zilliz.com.cn
url = "https://api.cloud.zilliz.com"
api_key = ""
cluster_id = "inxx-xxxxxxxxxxxxxxx"
stage_name = "my-first-stage"
data_path = "dataPath"
print(f"\n===================== クラウドベクトルDBにファイルをインポート ====================")
resp = bulk_import(
url=url,
api_key=api_key,
cluster_id=cluster_id,
collection_name='quick_setup',
stage_name=stage_name,
data_paths=[[data_path]]
)
print(resp.json())
if __name__ == '__main__':
# # クラウドバルクインサートAPIを呼び出すには、Zilliz Cloud(https://zilliz.com/cloud)からクラウドサービスを申し込む必要があります
cloud_bulkinsert()
private static String bulkImport() throws InterruptedException {
/**
* URLの値は固定です。
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
String STAGE_NAME = "my-first-stage";
List<String> DATA_PATH = Lists.newArrayList("dataPath");
StageImportRequest stageImportRequest = StageImportRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID).collectionName("quick_setup")
.stageName(STAGE_NAME).dataPaths(Lists.newArrayList(Collections.singleton(DATA_PATH)))
.build();
String bulkImportResult = BulkImportUtils.bulkImport(CLOUD_API_ENDPOINT, stageImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("バルクインサートタスクを作成しました、ジョブID: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
String jobId = bulkImport();
}
// 0f7fe853-d93e-4681-99f2-4719c63585cc
外部ストレージを介したデータのインポート
外部ストレージ経由でデータをインポートする場合は、以下のようにします:
- Python
- Java
from pymilvus.bulk_writer import bulk_import
# 準備したデータファイルからデータをバルクインポート
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
STORAGE_URL = ""
ACCESS_KEY = ""
SECRET_KEY = ""
res = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name="quick_setup",
object_url=STORAGE_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)
print(res.json())
# 出力
#
# {
# "code": 0,
# "data": {
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a"
# }
# }
private static String bulkImport() throws InterruptedException {
/**
* URLの値は固定です。
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
String STORAGE_URL = "";
String ACCESS_KEY = "";
String SECRET_KEY = "";
CloudImportRequest cloudImportRequest = CloudImportRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID)
.collectionName("quick_setup")
.objectUrl(STORAGE_URL)
.accessKey(ACCESS_KEY)
.secretKey(SECRET_KEY)
.build();
String bulkImportResult = BulkImport.bulkImport(CLOUD_API_ENDPOINT, cloudImportRequest);
System.out.println(bulkImportResult);
JsonObject bulkImportObject = new Gson().fromJson(bulkImportResult, JsonObject.class);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("バルクインサートタスクを作成しました、ジョブID: " + jobId);
return jobId;
}
public static void main(String[] args) throws Exception {
String jobId = bulkImport();
}
// 0f7fe853-d93e-4681-99f2-4719c63585cc
データインポートが成功するには、ターゲットコレクションに実行中または保留中のインポートジョブが10,000件未満であることを確認してください。
インポートの進行状況を確認
指定されたバルクインポートジョブの進行状況を確認できます。
- Python
- Java
import json
from pymilvus.bulk_writer import get_import_progress
## Zilliz Cloud定数
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
# バルクインサートジョブの進行状況を取得
resp = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
job_id="job-01fa0e5d42cjxudhpuehyp",
)
print(json.dumps(resp.json(), indent=4))
private static void getImportProgress(String jobId) {
/**
* URLの値は固定です。
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
CloudDescribeImportRequest request = CloudDescribeImportRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID)
.jobId(jobId)
.build();
String getImportProgressResult = BulkImport.getImportProgress(CLOUD_API_ENDPOINT, request);
System.out.println("インポート進行状況を取得しました、結果: " + getImportProgressResult);
}
public static void main(String[] args) throws Exception {
getImportProgress("job-xxxx");
}
すべてのインポートジョブを一覧表示
すべてのバルクインポートタスクについても知りたい場合は、以下のようにlist-import-jobs APIを呼び出すことができます:
- Python
- Java
import json
from pymilvus.bulk_writer import list_import_jobs
## Zilliz Cloud定数
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
# バルクインサートジョブを一覧表示
resp = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID
)
print(json.dumps(resp.json(), indent=4))
private static void listImportJobs() {
/**
* URLの値は固定です。
*/
String CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com";
String CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx";
String API_KEY = "";
CloudListImportJobsRequest listImportJobsRequest = CloudListImportJobsRequest.builder()
.apiKey(API_KEY)
.clusterId(CLUSTER_ID).build();
String listImportJobsResult = BulkImport.listImportJobs(CLOUD_API_ENDPOINT, listImportJobsRequest);
System.out.println(listImportJobsResult);
}
public static void main(String[] args) throws Exception {
listImportJobs();
}