メインコンテンツまでスキップ
バージョン: User Guides (BYOC)

データインポート(SDK)

このガイドでは、SDKを使用してバルクライターおよびバルクインポートAPIでコレクションにデータをインポートする方法を学習できます。

または、高速エンドツーエンドコースも参照できます。これには、データの準備とZilliz Cloudコレクションへのデータインポートの両方が含まれています。

依存関係のインストール

端末で以下のコマンドを実行して、pymilvusminio をインストールするか、最新バージョンにアップグレードしてください。

python3 -m pip install --upgrade pymilvus minio

準備されたデータの確認

バッチライターツールを使用してデータを準備し、準備されたファイルへのパスを取得したら、Zilliz Cloudコレクションにそれらをインポートする準備が整っています。準備ができているかどうかを確認するには、以下のようにしてください。

from minio import Minio

# サードパーティの定数
ACCESS_KEY = "YOUR_ACCESS_KEY"
SECRET_KEY = "YOUR_SECRET_KEY"
BUCKET_NAME = "YOUR_BUCKET_NAME"
REMOTE_PATH = "YOUR_REMOTE_PATH"

client = Minio(
endpoint="storage.googleapis.com", # AWS S3の場合は 's3.amazonaws.com' を使用
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=True
)

objects = client.list_objects(
bucket_name=BUCKET_NAME,
prefix=REMOTE_PATH,
recursive=True
)

print([obj.object_name for obj in objects])

# 出力
#
# [
# "folder/1/claps.npy",
# "folder/1/id.npy",
# "folder/1/link.npy",
# "folder/1/publication.npy",
# "folder/1/reading_time.npy",
# "folder/1/responses.npy",
# "folder/1/title.npy",
# "folder/1/vector.npy"
# ]

データのインポート

データとコレクションの準備が整ったら、オブジェクトストレージバケットやブロックストレージのblobコンテナなどの外部ストレージを介して、特定のコレクションにデータをインポートできます。

データのインポート

外部ストレージを介してデータをインポートする場合は、以下のようにしてください。

from pymilvus.bulk_writer import bulk_import

# 準備されたデータファイルからデータをバルクインポート
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""
STORAGE_URL = ""
ACCESS_KEY = ""
SECRET_KEY = ""

res = bulk_import(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
collection_name="quick_setup",
object_url=STORAGE_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)

print(res.json())

# 出力
#
# {
# "code": 0,
# "data": {
# "jobId": "9d0bc230-6b99-4739-a872-0b91cfe2515a"
# }
# }
📘注意

データインポートが成功するように、ターゲットコレクションに実行中または保留中のインポートジョブが10,000個未満であることを確認してください。

インポートの進行状況の確認

指定されたバルクインポートジョブの進行状況を確認できます。

import json
from pymilvus.bulk_writer import get_import_progress

## Zilliz Cloud定数
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""

# バルクインサートジョブの進行状況を取得
resp = get_import_progress(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID,
job_id="job-01fa0e5d42cjxudhpuehyp",
)

print(json.dumps(resp.json(), indent=4))

すべてのインポートジョブのリスト

すべてのバルクインポートタスクについても知りたい場合は、以下のようにリストインポートジョブAPIを呼び出すことができます。

import json
from pymilvus.bulk_writer import list_import_jobs

## Zilliz Cloud定数
CLOUD_API_ENDPOINT = "https://api.cloud.zilliz.com"
CLUSTER_ID = "inxx-xxxxxxxxxxxxxxx"
API_KEY = ""

# バルクインサートジョブをリスト
resp = list_import_jobs(
api_key=API_KEY,
url=CLOUD_API_ENDPOINT,
cluster_id=CLUSTER_ID
)

print(json.dumps(resp.json(), indent=4))