From 82f45309caf0888b71dbcef93da5db50a921b7f5 Mon Sep 17 00:00:00 2001 From: Riskey <36894937+RiskeyL@users.noreply.github.com> Date: Thu, 11 Dec 2025 12:30:38 +0800 Subject: [PATCH] add the migrations script (#605) Co-authored-by: Riskey --- assets/migrate_weaviate_collections.py | 388 ++++++++++++++++++ .../troubleshooting/weaviate-v4-migration.mdx | 6 +- 2 files changed, 391 insertions(+), 3 deletions(-) create mode 100644 assets/migrate_weaviate_collections.py diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py new file mode 100644 index 00000000..4631cae7 --- /dev/null +++ b/assets/migrate_weaviate_collections.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python3 +""" +Migration script to fix Weaviate schema incompatibility between 1.19.0 and 1.27.0+ + +This script: +- Identifies collections with old schema (no vectorConfig) +- Creates new collections with proper vectorConfig including "default" named vector +- Migrates data using cursor-based pagination (efficient for large datasets) +- Uses batch operations for fast inserts +- Preserves all object properties and vectors +""" + +import weaviate +from weaviate.classes.config import Configure, VectorDistances +import sys +import time +from typing import List, Dict, Any + +# Configuration +WEAVIATE_HOST = "localhost" +WEAVIATE_PORT = 8080 +WEAVIATE_GRPC_PORT = 50051 +WEAVIATE_API_KEY = "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih" +BATCH_SIZE = 100 + + +def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]: + """Identify collections that need migration (those without vectorConfig)""" + collections_to_migrate = [] + + all_collections = client.collections.list_all() + print(f"Found {len(all_collections)} total collections") + + for collection_name in all_collections.keys(): + # Only check Vector_index collections (Dify knowledge bases) + if not collection_name.startswith("Vector_index_"): + continue + + collection = client.collections.get(collection_name) + config = collection.config.get() + + # Check if this collection has the old schema + if config.vector_config is None: + collections_to_migrate.append(collection_name) + print(f" - {collection_name}: OLD SCHEMA (needs migration)") + else: + print(f" - {collection_name}: NEW SCHEMA (skip)") + + return collections_to_migrate + + +def get_collection_schema(client: weaviate.WeaviateClient, collection_name: str) -> Dict[str, Any]: + """Get the full schema of a collection via REST API""" + import requests + + response = requests.get( + f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"} + ) + + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to get schema: {response.text}") + + +def create_new_collection(client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any]) -> str: + """Create a new collection with updated schema using REST API""" + import requests + + # Generate new collection name + new_name = f"{old_name}_migrated" + + print(f"Creating new collection: {new_name}") + + # Build new schema with proper vectorConfig + # Note: When using vectorConfig (named vectors), we don't set class-level vectorizer + new_schema = { + "class": new_name, + # This is the key: define vectorConfig with "default" named vector + # Do NOT set class-level vectorizer when using vectorConfig + "vectorConfig": { + "default": { + "vectorizer": { + "none": {} + }, + "vectorIndexType": "hnsw", + "vectorIndexConfig": { + "distance": "cosine", + "ef": -1, + "efConstruction": 128, + "maxConnections": 32 + } + } + }, + "properties": [] + } + + # Copy properties from old schema + if "properties" in schema: + new_schema["properties"] = schema["properties"] + + # Create collection via REST API + response = requests.post( + f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema", + json=new_schema, + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"} + ) + + if response.status_code not in [200, 201]: + raise Exception(f"Failed to create collection: {response.text}") + + print(f" Created new collection: {new_name}") + return new_name + + +def migrate_collection_data( + client: weaviate.WeaviateClient, + old_collection_name: str, + new_collection_name: str +) -> int: + """Migrate data from old collection to new collection using cursor-based pagination""" + + old_collection = client.collections.get(old_collection_name) + new_collection = client.collections.get(new_collection_name) + + total_migrated = 0 + cursor = None + + print(f"Migrating data from {old_collection_name} to {new_collection_name}") + + while True: + # Fetch batch of objects using cursor-based pagination + if cursor is None: + # First batch + response = old_collection.query.fetch_objects( + limit=BATCH_SIZE, + include_vector=True + ) + else: + # Subsequent batches using cursor + response = old_collection.query.fetch_objects( + limit=BATCH_SIZE, + include_vector=True, + after=cursor + ) + + objects = response.objects + + if not objects: + break + + # Use batch insert for efficiency + with new_collection.batch.dynamic() as batch: + for obj in objects: + # Prepare properties + properties = obj.properties + + # Add object with vector + batch.add_object( + properties=properties, + vector=obj.vector["default"] if isinstance(obj.vector, dict) else obj.vector, + uuid=obj.uuid + ) + + total_migrated += len(objects) + print(f" Migrated {total_migrated} objects...") + + # Update cursor for next iteration + if len(objects) < BATCH_SIZE: + # Last batch + break + else: + # Get the last object's UUID for cursor + cursor = objects[-1].uuid + + print(f" Total migrated: {total_migrated} objects") + return total_migrated + + +def verify_migration( + client: weaviate.WeaviateClient, + old_collection_name: str, + new_collection_name: str +): + """Verify that the migration was successful""" + + old_collection = client.collections.get(old_collection_name) + new_collection = client.collections.get(new_collection_name) + + # Count objects in both collections + old_count_response = old_collection.query.fetch_objects(limit=1) + new_count_response = new_collection.query.fetch_objects(limit=1) + + # Get aggregation for accurate counts + old_agg = old_collection.aggregate.over_all(total_count=True) + new_agg = new_collection.aggregate.over_all(total_count=True) + + old_count = old_agg.total_count + new_count = new_agg.total_count + + print(f"\nVerification:") + print(f" Old collection ({old_collection_name}): {old_count} objects") + print(f" New collection ({new_collection_name}): {new_count} objects") + + if old_count == new_count: + print(f" Status: SUCCESS - Counts match!") + return True + else: + print(f" Status: WARNING - Counts don't match!") + return False + + +def replace_old_collection( + client: weaviate.WeaviateClient, + old_collection_name: str, + new_collection_name: str +): + """Replace old collection with migrated one by recreating with original name""" + import requests + + print(f"\nReplacing old collection with migrated data...") + + # Step 1: Get data from migrated collection + print(f" Step 1: Getting data from migrated collection...") + migrated = client.collections.get(new_collection_name) + objects = migrated.query.fetch_objects(include_vector=True, limit=10000) + print(f" Found {len(objects.objects)} objects") + + # Step 2: Delete old collection + print(f" Step 2: Deleting old collection...") + response = requests.delete( + f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{old_collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"} + ) + if response.status_code != 200: + print(f" Warning: Could not delete old collection: {response.text}") + else: + print(f" Deleted") + + # Step 3: Get schema from migrated collection + print(f" Step 3: Getting schema from migrated collection...") + schema_response = requests.get( + f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{new_collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"} + ) + schema = schema_response.json() + schema["class"] = old_collection_name + + # Step 4: Create collection with original name and new schema + print(f" Step 4: Creating collection with original name...") + create_response = requests.post( + f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema", + json=schema, + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"} + ) + if create_response.status_code not in [200, 201]: + raise Exception(f"Failed to create collection: {create_response.text}") + print(f" Created") + + # Step 5: Copy data to collection with original name + print(f" Step 5: Copying data to original collection name...") + new_collection = client.collections.get(old_collection_name) + + with new_collection.batch.dynamic() as batch: + for obj in objects.objects: + batch.add_object( + properties=obj.properties, + vector=obj.vector, + uuid=obj.uuid + ) + + count = new_collection.aggregate.over_all(total_count=True).total_count + print(f" Copied {count} objects") + + # Step 6: Delete the temporary migrated collection + print(f" Step 6: Cleaning up temporary migrated collection...") + response = requests.delete( + f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{new_collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"} + ) + if response.status_code == 200: + print(f" Cleaned up") + + print(f"\n SUCCESS! {old_collection_name} now has the new schema with {count} objects") + return True + + +def migrate_all_collections(): + """Main migration function""" + + print("=" * 80) + print("Weaviate Collection Migration Script") + print("Migrating from Weaviate 1.19.0 schema to 1.27.0+ schema") + print("=" * 80) + print() + + client = weaviate.connect_to_local( + host=WEAVIATE_HOST, + port=WEAVIATE_PORT, + grpc_port=WEAVIATE_GRPC_PORT, + auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY) + ) + + try: + # Step 1: Identify collections that need migration + print("Step 1: Identifying collections that need migration...") + collections_to_migrate = identify_old_collections(client) + + if not collections_to_migrate: + print("\nNo collections need migration. All collections are up to date!") + return + + print(f"\nFound {len(collections_to_migrate)} collections to migrate:") + for col in collections_to_migrate: + print(f" - {col}") + + # Confirm before proceeding + print("\nThis script will:") + print("1. Create new collections with updated schema") + print("2. Copy all data using efficient batch operations") + print("3. Verify the migration") + print("4. Optionally rename collections to activate the new ones") + print() + + # Step 2: Migrate each collection + for collection_name in collections_to_migrate: + print("\n" + "=" * 80) + print(f"Migrating: {collection_name}") + print("=" * 80) + + try: + # Get old schema + schema = get_collection_schema(client, collection_name) + + # Create new collection + new_collection_name = create_new_collection(client, collection_name, schema) + + # Migrate data + migrated_count = migrate_collection_data(client, collection_name, new_collection_name) + + # Verify migration + success = verify_migration(client, collection_name, new_collection_name) + + if success and migrated_count > 0: + print(f"\nMigration successful for {collection_name}!") + print(f"New collection: {new_collection_name}") + + # Automatically replace old collection with migrated one + try: + replace_old_collection(client, collection_name, new_collection_name) + except Exception as e: + print(f"\nWarning: Could not automatically replace collection: {e}") + print(f"\nTo activate manually:") + print(f"1. Delete the old collection: {collection_name}") + print(f"2. Rename {new_collection_name} to {collection_name}") + + except Exception as e: + print(f"\nError migrating {collection_name}: {e}") + print(f"Skipping this collection and continuing...") + continue + + print("\n" + "=" * 80) + print("Migration Complete!") + print("=" * 80) + print("\nSummary:") + print(f" Collections migrated: {len(collections_to_migrate)}") + print(f"\nNext steps:") + print(f"1. Test the new collections (*_migrated)") + print(f"2. If everything works, delete or backup the old collections") + print(f"3. Rename the new collections to remove '_migrated' suffix") + + finally: + client.close() + + +if __name__ == "__main__": + try: + migrate_all_collections() + except KeyboardInterrupt: + print("\n\nMigration interrupted by user.") + sys.exit(1) + except Exception as e: + print(f"\n\nFatal error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + diff --git a/en/self-host/troubleshooting/weaviate-v4-migration.mdx b/en/self-host/troubleshooting/weaviate-v4-migration.mdx index 99a8ed5c..f1c95f36 100644 --- a/en/self-host/troubleshooting/weaviate-v4-migration.mdx +++ b/en/self-host/troubleshooting/weaviate-v4-migration.mdx @@ -105,7 +105,7 @@ Safest path. Creates a backup before upgrading so you can restore if anything go - Currently running Weaviate 1.19 - Docker + Docker Compose installed -- Python 3.11+ available for the schema migration script +- Python 3.11+ available for the [schema migration script](/assets/migrate_weaviate_collections.py) #### Step A1: Enable the Backup Module on Weaviate 1.19 @@ -237,7 +237,7 @@ sleep 15 pip install weaviate-client requests ``` -2. **Run the migration script:** +2. **Run the [migration script](/assets/migrate_weaviate_collections.py):** ```bash python3 migrate_weaviate_collections.py @@ -267,7 +267,7 @@ Only use this path if you already upgraded to 1.27+ and your knowledge bases sto - Currently running Weaviate 1.27+ (including 1.33) - Docker + Docker Compose installed -- Python 3.11+ for the migration script +- Python 3.11+ for the [migration script](/assets/migrate_weaviate_collections.py) #### Step B1: Repair Orphaned LSM Data