メインコンテンツまでスキップ
バージョン: User Guides (Cloud)

サービングクラスターのクイックスタート

サービングクラスターは、リアルタイムの本番サービング向けにコンピュートとストレージを統合した自己完結型サーバーです。Extract-Transform-Load(ETL)パイプラインでデータを整備した後、サービングクラスターに取り込むことで大幅な性能向上を実現できます。

開始前の準備

以下の手順では、サービングクラスターを作成済みで、エンドポイントとアクセス認証情報を取得済みであることを前提としています。

ステップ 1: 接続を設定する

クラスター認証情報または API キーを取得したら、それを使ってクラスターに接続できます。

from pymilvus import MilvusClient, DataType

SERVING_CLUSTER_ENDPOINT = "https://{cluster-id}.{region}.vectordb.zillizcloud.com:19530"
TOKEN = "YOUR_CLUSTER_TOKEN"
# A valid token could be either
# - An API key, or
# - A colon-joined cluster username and password, as in \`user:pass\`

# 1. Set up a Milvus client
client = MilvusClient(
uri=SERVING_CLUSTER_ENDPOINT,
token=TOKEN
)

ステップ 2: (任意)データベースを作成する。

サービングクラスターにはデフォルトデータベースが用意されています。デフォルトを使う場合はこの手順をスキップできます。必要に応じて以下のように作成してください。

# connect to the serving cluster
client = MilvusClient(
# a cluster-specific endpoint
uri=SERVING_CLUSTER_ENDPOINT,
token=TOKEN
)

client.create_database(
db_name="my_database"
)

ステップ 3: コレクションを作成する。

データベースの準備ができたら、そこにマネージドコレクションを作成できます。外部データファイルにコレクションの列をマッピングする外部コレクションとは異なり、マネージドコレクションはデータをインポートすることで高いパフォーマンスを得られます。

以下の例では、コレクションスキーマの設定とコレクション作成の方法を示します。

from pymilvus import MilvusClient, DataType

schema = MilvusClient.create_schema()

schema.add_field(
field_name="product_id",
datatype=DataType.INT64,
is_primary=True
)

schema.add_field(
field_name="product_name",
datatype=DataType.VARCHAR,
max_length=512
)

schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768
)

続いて、上記スキーマでコレクションを作成します。デフォルトデータベースを使用する場合は、db_name パラメータを省略できます。

client.use_database(
db_name="my_database"
)

# create the collection
client.create_collection(
collection_name="prod_collection",
schema=schema
)

ステップ 4: インデックスを作成する。

すべてのベクトルフィールドに対してインデックスを作成する必要があり、必要に応じてスカラーフィールドにもインデックスを作成できます。

index_params = client.prepare_index_params()

# Add indexes
index_params.add_index(
field_name="embedding",
index_type="AUTOINDEX",
metric_type="COSINE"
)

index_params.add_index(
field_name="product_name",
index_type="AUTOINDEX"
)

client.create_index(
db_name="my_database",
collection_name="prod_collection",
index_params=index_params
)

ステップ 5: コレクションをロードする。

インデックスの準備ができたら、コレクションをメモリにロードします。

client.load_collection(
db_name="my_database",
collection_name="prod_collection"
)

ステップ 6: データをインポートする。

準備が整ったら、処理済みデータをインポートできます。以下の例では、処理済みデータを外部ストレージバケットに保存していることを前提としています。

バケット内データ形式やストレージ連携については、Format Options を参照してください。

from pymilvus.bulk_writer import bulk_import

# The path should be relative to the root
# of a zilliz cloud volume or an external storage
OBJECT_URLS = [[
"https://s3.us-west-2.amazonaws.com/your-bucket/path/in/external/storage.json"
]]

ACCESS_KEY = "YOUR_STORAGE_ACCESS_KEY"
SECRET_KEY = "YOUR_STORAGE_SECRET_KEY"

res = bulk_import(
api_key="YOUR_ZILLIZ_API_KEY",
url="https://api.cloud.zilliz.com",
cluser_id="inxx-xxxxxxxxxxxxxxxxxxx",
db_name="my_database",
collection_name="prod_collection",
object_url=OBJECT_URLS,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)

# job-xxxxxxxxxxxxxxxxxxxxx

返却された job ID を使って進捗を確認できます。

import json
from pymilvus.bulk_writer import get_import_progress

# Get bulk-insert job progress
resp = get_import_progress(
api_key="YOUR_ZILLIZ_API_KEY",
url="https://api.cloud.zilliz.com",
cluster_id="inxx-xxxxxxxxxxxxxxxxxxx",
job_id="job-xxxxxxxxxxxxxxxxxxxxx",
)

print(json.dumps(resp.json(), indent=4))

ステップ 7: データを提供する。

インポートが完了したら、検索、クエリ、ハイブリッド検索を通じてユーザーにデータを提供できます。

query_vector = [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, ..., 0.9029438446296592]
res = client.search(
db_name="my_database",
collection_name="prod_collection",
anns_field="embedding",
data=[query_vector],
limit=3,
output_fields=["product_name"],
search_params={"metric_type": "COSINE"}
)