Files
dify-docs/assets/migrate_weaviate_collections.py
Riskey 82f45309ca add the migrations script (#605)
Co-authored-by: Riskey <riskey47@dify.ai>
2025-12-11 12:30:38 +08:00

389 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Migration script to fix Weaviate schema incompatibility between 1.19.0 and 1.27.0+
This script:
- Identifies collections with old schema (no vectorConfig)
- Creates new collections with proper vectorConfig including "default" named vector
- Migrates data using cursor-based pagination (efficient for large datasets)
- Uses batch operations for fast inserts
- Preserves all object properties and vectors
"""
import weaviate
from weaviate.classes.config import Configure, VectorDistances
import sys
import time
from typing import List, Dict, Any
# Configuration
WEAVIATE_HOST = "localhost"
WEAVIATE_PORT = 8080
WEAVIATE_GRPC_PORT = 50051
WEAVIATE_API_KEY = "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih"
BATCH_SIZE = 100
def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]:
"""Identify collections that need migration (those without vectorConfig)"""
collections_to_migrate = []
all_collections = client.collections.list_all()
print(f"Found {len(all_collections)} total collections")
for collection_name in all_collections.keys():
# Only check Vector_index collections (Dify knowledge bases)
if not collection_name.startswith("Vector_index_"):
continue
collection = client.collections.get(collection_name)
config = collection.config.get()
# Check if this collection has the old schema
if config.vector_config is None:
collections_to_migrate.append(collection_name)
print(f" - {collection_name}: OLD SCHEMA (needs migration)")
else:
print(f" - {collection_name}: NEW SCHEMA (skip)")
return collections_to_migrate
def get_collection_schema(client: weaviate.WeaviateClient, collection_name: str) -> Dict[str, Any]:
"""Get the full schema of a collection via REST API"""
import requests
response = requests.get(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get schema: {response.text}")
def create_new_collection(client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any]) -> str:
"""Create a new collection with updated schema using REST API"""
import requests
# Generate new collection name
new_name = f"{old_name}_migrated"
print(f"Creating new collection: {new_name}")
# Build new schema with proper vectorConfig
# Note: When using vectorConfig (named vectors), we don't set class-level vectorizer
new_schema = {
"class": new_name,
# This is the key: define vectorConfig with "default" named vector
# Do NOT set class-level vectorizer when using vectorConfig
"vectorConfig": {
"default": {
"vectorizer": {
"none": {}
},
"vectorIndexType": "hnsw",
"vectorIndexConfig": {
"distance": "cosine",
"ef": -1,
"efConstruction": 128,
"maxConnections": 32
}
}
},
"properties": []
}
# Copy properties from old schema
if "properties" in schema:
new_schema["properties"] = schema["properties"]
# Create collection via REST API
response = requests.post(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema",
json=new_schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {response.text}")
print(f" Created new collection: {new_name}")
return new_name
def migrate_collection_data(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
) -> int:
"""Migrate data from old collection to new collection using cursor-based pagination"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
total_migrated = 0
cursor = None
print(f"Migrating data from {old_collection_name} to {new_collection_name}")
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True
)
else:
# Subsequent batches using cursor
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
# Prepare properties
properties = obj.properties
# Add object with vector
batch.add_object(
properties=properties,
vector=obj.vector["default"] if isinstance(obj.vector, dict) else obj.vector,
uuid=obj.uuid
)
total_migrated += len(objects)
print(f" Migrated {total_migrated} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
# Last batch
break
else:
# Get the last object's UUID for cursor
cursor = objects[-1].uuid
print(f" Total migrated: {total_migrated} objects")
return total_migrated
def verify_migration(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Verify that the migration was successful"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
# Count objects in both collections
old_count_response = old_collection.query.fetch_objects(limit=1)
new_count_response = new_collection.query.fetch_objects(limit=1)
# Get aggregation for accurate counts
old_agg = old_collection.aggregate.over_all(total_count=True)
new_agg = new_collection.aggregate.over_all(total_count=True)
old_count = old_agg.total_count
new_count = new_agg.total_count
print(f"\nVerification:")
print(f" Old collection ({old_collection_name}): {old_count} objects")
print(f" New collection ({new_collection_name}): {new_count} objects")
if old_count == new_count:
print(f" Status: SUCCESS - Counts match!")
return True
else:
print(f" Status: WARNING - Counts don't match!")
return False
def replace_old_collection(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Replace old collection with migrated one by recreating with original name"""
import requests
print(f"\nReplacing old collection with migrated data...")
# Step 1: Get data from migrated collection
print(f" Step 1: Getting data from migrated collection...")
migrated = client.collections.get(new_collection_name)
objects = migrated.query.fetch_objects(include_vector=True, limit=10000)
print(f" Found {len(objects.objects)} objects")
# Step 2: Delete old collection
print(f" Step 2: Deleting old collection...")
response = requests.delete(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{old_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code != 200:
print(f" Warning: Could not delete old collection: {response.text}")
else:
print(f" Deleted")
# Step 3: Get schema from migrated collection
print(f" Step 3: Getting schema from migrated collection...")
schema_response = requests.get(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
schema = schema_response.json()
schema["class"] = old_collection_name
# Step 4: Create collection with original name and new schema
print(f" Step 4: Creating collection with original name...")
create_response = requests.post(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema",
json=schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if create_response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {create_response.text}")
print(f" Created")
# Step 5: Copy data to collection with original name
print(f" Step 5: Copying data to original collection name...")
new_collection = client.collections.get(old_collection_name)
with new_collection.batch.dynamic() as batch:
for obj in objects.objects:
batch.add_object(
properties=obj.properties,
vector=obj.vector,
uuid=obj.uuid
)
count = new_collection.aggregate.over_all(total_count=True).total_count
print(f" Copied {count} objects")
# Step 6: Delete the temporary migrated collection
print(f" Step 6: Cleaning up temporary migrated collection...")
response = requests.delete(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
print(f" Cleaned up")
print(f"\n SUCCESS! {old_collection_name} now has the new schema with {count} objects")
return True
def migrate_all_collections():
"""Main migration function"""
print("=" * 80)
print("Weaviate Collection Migration Script")
print("Migrating from Weaviate 1.19.0 schema to 1.27.0+ schema")
print("=" * 80)
print()
client = weaviate.connect_to_local(
host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
grpc_port=WEAVIATE_GRPC_PORT,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY)
)
try:
# Step 1: Identify collections that need migration
print("Step 1: Identifying collections that need migration...")
collections_to_migrate = identify_old_collections(client)
if not collections_to_migrate:
print("\nNo collections need migration. All collections are up to date!")
return
print(f"\nFound {len(collections_to_migrate)} collections to migrate:")
for col in collections_to_migrate:
print(f" - {col}")
# Confirm before proceeding
print("\nThis script will:")
print("1. Create new collections with updated schema")
print("2. Copy all data using efficient batch operations")
print("3. Verify the migration")
print("4. Optionally rename collections to activate the new ones")
print()
# Step 2: Migrate each collection
for collection_name in collections_to_migrate:
print("\n" + "=" * 80)
print(f"Migrating: {collection_name}")
print("=" * 80)
try:
# Get old schema
schema = get_collection_schema(client, collection_name)
# Create new collection
new_collection_name = create_new_collection(client, collection_name, schema)
# Migrate data
migrated_count = migrate_collection_data(client, collection_name, new_collection_name)
# Verify migration
success = verify_migration(client, collection_name, new_collection_name)
if success and migrated_count > 0:
print(f"\nMigration successful for {collection_name}!")
print(f"New collection: {new_collection_name}")
# Automatically replace old collection with migrated one
try:
replace_old_collection(client, collection_name, new_collection_name)
except Exception as e:
print(f"\nWarning: Could not automatically replace collection: {e}")
print(f"\nTo activate manually:")
print(f"1. Delete the old collection: {collection_name}")
print(f"2. Rename {new_collection_name} to {collection_name}")
except Exception as e:
print(f"\nError migrating {collection_name}: {e}")
print(f"Skipping this collection and continuing...")
continue
print("\n" + "=" * 80)
print("Migration Complete!")
print("=" * 80)
print("\nSummary:")
print(f" Collections migrated: {len(collections_to_migrate)}")
print(f"\nNext steps:")
print(f"1. Test the new collections (*_migrated)")
print(f"2. If everything works, delete or backup the old collections")
print(f"3. Rename the new collections to remove '_migrated' suffix")
finally:
client.close()
if __name__ == "__main__":
try:
migrate_all_collections()
except KeyboardInterrupt:
print("\n\nMigration interrupted by user.")
sys.exit(1)
except Exception as e:
print(f"\n\nFatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)