""" 向量搜索处理器单元测试 测试向量搜索应用层的命令和查询处理器。 使用 Mock 仓储隔离外部依赖。 """ import pytest from unittest.mock import AsyncMock, MagicMock from datetime import datetime from src.application.vector_search.handlers import ( CreateDocumentHandler, UpdateDocumentHandler, DeleteDocumentHandler, SearchDocumentsHandler, GetDocumentHandler ) from src.application.vector_search.commands import ( CreateDocumentCommand, UpdateDocumentCommand, DeleteDocumentCommand ) from src.application.vector_search.queries import ( SearchDocumentsQuery, GetDocumentQuery ) from src.application.shared.exceptions import ( ResourceNotFoundException, ValidationException, ApplicationException ) from src.domain.vector_search.entities import Document, SearchResult from src.domain.vector_search.value_objects import Vector, SearchQuery from src.domain.shared.value_objects import EntityId, Timestamp from src.domain.shared.exceptions import DomainException class TestCreateDocumentHandler: """测试创建文档处理器""" @pytest.fixture def mock_repository(self): """创建 Mock 仓储""" repository = AsyncMock() repository.save = AsyncMock() return repository @pytest.fixture def handler(self, mock_repository): """创建处理器实例""" return CreateDocumentHandler(mock_repository) @pytest.mark.asyncio async def test_handle_creates_document_successfully(self, handler, mock_repository): """测试成功创建文档""" # Arrange command = CreateDocumentCommand( content="Test document content", metadata={"source": "test"} ) # Act document_id = await handler.handle(command) # Assert assert document_id is not None assert isinstance(document_id, str) assert len(document_id) > 0 # 验证仓储的 save 方法被调用 mock_repository.save.assert_called_once() # 获取传递给 save 的文档 saved_document = mock_repository.save.call_args[0][0] assert isinstance(saved_document, Document) assert saved_document.content == "Test document content" assert saved_document.metadata == {"source": "test"} assert saved_document.embedding is None # 嵌入向量应该为 None @pytest.mark.asyncio async def test_handle_with_empty_metadata(self, handler, mock_repository): """测试使用空元数据创建文档""" # Arrange command = CreateDocumentCommand( content="Test content", metadata={} ) # Act document_id = await handler.handle(command) # Assert assert document_id is not None mock_repository.save.assert_called_once() @pytest.mark.asyncio async def test_handle_raises_application_exception_on_repository_error( self, handler, mock_repository ): """测试仓储错误时抛出应用异常""" # Arrange command = CreateDocumentCommand( content="Test content", metadata={} ) mock_repository.save.side_effect = Exception("Database error") # Act & Assert with pytest.raises(ApplicationException) as exc_info: await handler.handle(command) assert "Failed to create document" in str(exc_info.value) class TestUpdateDocumentHandler: """测试更新文档处理器""" @pytest.fixture def mock_repository(self): """创建 Mock 仓储""" repository = AsyncMock() return repository @pytest.fixture def handler(self, mock_repository): """创建处理器实例""" return UpdateDocumentHandler(mock_repository) @pytest.fixture def existing_document(self): """创建现有文档""" return Document( id=EntityId("doc_123"), content="Original content", embedding=Vector([1.0, 2.0, 3.0]), metadata={"source": "test", "version": "1.0"}, created_at=Timestamp.now(), updated_at=Timestamp.now() ) @pytest.mark.asyncio async def test_handle_updates_content_successfully( self, handler, mock_repository, existing_document ): """测试成功更新文档内容""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=existing_document) mock_repository.save = AsyncMock() command = UpdateDocumentCommand( document_id="doc_123", content="Updated content" ) # Act await handler.handle(command) # Assert mock_repository.find_by_id.assert_called_once() mock_repository.save.assert_called_once() # 验证文档内容已更新 assert existing_document.content == "Updated content" # 验证嵌入向量已清除 assert existing_document.embedding is None @pytest.mark.asyncio async def test_handle_updates_metadata_with_merge( self, handler, mock_repository, existing_document ): """测试合并更新元数据""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=existing_document) mock_repository.save = AsyncMock() command = UpdateDocumentCommand( document_id="doc_123", metadata={"author": "John"}, merge_metadata=True ) # Act await handler.handle(command) # Assert # 验证元数据已合并 assert existing_document.metadata["source"] == "test" assert existing_document.metadata["version"] == "1.0" assert existing_document.metadata["author"] == "John" @pytest.mark.asyncio async def test_handle_updates_metadata_without_merge( self, handler, mock_repository, existing_document ): """测试替换更新元数据""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=existing_document) mock_repository.save = AsyncMock() command = UpdateDocumentCommand( document_id="doc_123", metadata={"author": "John"}, merge_metadata=False ) # Act await handler.handle(command) # Assert # 验证元数据已替换 assert existing_document.metadata == {"author": "John"} assert "source" not in existing_document.metadata assert "version" not in existing_document.metadata @pytest.mark.asyncio async def test_handle_raises_not_found_when_document_does_not_exist( self, handler, mock_repository ): """测试文档不存在时抛出 ResourceNotFoundException""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=None) command = UpdateDocumentCommand( document_id="nonexistent_doc", content="New content" ) # Act & Assert with pytest.raises(ResourceNotFoundException) as exc_info: await handler.handle(command) assert "Document" in str(exc_info.value) assert "nonexistent_doc" in str(exc_info.value) class TestDeleteDocumentHandler: """测试删除文档处理器""" @pytest.fixture def mock_repository(self): """创建 Mock 仓储""" repository = AsyncMock() repository.delete = AsyncMock() return repository @pytest.fixture def handler(self, mock_repository): """创建处理器实例""" return DeleteDocumentHandler(mock_repository) @pytest.mark.asyncio async def test_handle_deletes_document_successfully(self, handler, mock_repository): """测试成功删除文档""" # Arrange command = DeleteDocumentCommand(document_id="doc_123") # Act await handler.handle(command) # Assert mock_repository.delete.assert_called_once() # 验证传递的文档 ID deleted_id = mock_repository.delete.call_args[0][0] assert str(deleted_id) == "doc_123" @pytest.mark.asyncio async def test_handle_raises_application_exception_on_repository_error( self, handler, mock_repository ): """测试仓储错误时抛出应用异常""" # Arrange command = DeleteDocumentCommand(document_id="doc_123") mock_repository.delete.side_effect = Exception("Database error") # Act & Assert with pytest.raises(ApplicationException) as exc_info: await handler.handle(command) assert "Failed to delete document" in str(exc_info.value) class TestSearchDocumentsHandler: """测试搜索文档处理器""" @pytest.fixture def mock_repository(self): """创建 Mock 仓储""" repository = AsyncMock() return repository @pytest.fixture def handler(self, mock_repository): """创建处理器实例""" return SearchDocumentsHandler(mock_repository) @pytest.fixture def search_results(self): """创建搜索结果""" doc1 = Document( id=EntityId("doc_1"), content="Machine learning document", embedding=Vector([1.0, 2.0, 3.0]), metadata={"category": "AI"}, created_at=Timestamp.now(), updated_at=Timestamp.now() ) doc2 = Document( id=EntityId("doc_2"), content="Deep learning document", embedding=Vector([2.0, 3.0, 4.0]), metadata={"category": "AI"}, created_at=Timestamp.now(), updated_at=Timestamp.now() ) return [ SearchResult(document=doc1, score=0.95, rank=0), SearchResult(document=doc2, score=0.85, rank=1) ] @pytest.mark.asyncio async def test_handle_searches_documents_successfully( self, handler, mock_repository, search_results ): """测试成功搜索文档""" # Arrange mock_repository.search = AsyncMock(return_value=search_results) query = SearchDocumentsQuery( query_text="machine learning", top_k=10 ) # Act results = await handler.handle(query) # Assert assert len(results) == 2 assert results[0].document.id == "doc_1" assert results[0].score == 0.95 assert results[1].document.id == "doc_2" assert results[1].score == 0.85 mock_repository.search.assert_called_once() @pytest.mark.asyncio async def test_handle_with_filters( self, handler, mock_repository, search_results ): """测试带过滤条件的搜索""" # Arrange mock_repository.search = AsyncMock(return_value=search_results) query = SearchDocumentsQuery( query_text="machine learning", top_k=10, filters={"category": "AI"} ) # Act results = await handler.handle(query) # Assert assert len(results) == 2 mock_repository.search.assert_called_once() # 验证传递的搜索查询 search_query = mock_repository.search.call_args[0][0] assert isinstance(search_query, SearchQuery) assert search_query.filters == {"category": "AI"} class TestGetDocumentHandler: """测试获取文档处理器""" @pytest.fixture def mock_repository(self): """创建 Mock 仓储""" repository = AsyncMock() return repository @pytest.fixture def handler(self, mock_repository): """创建处理器实例""" return GetDocumentHandler(mock_repository) @pytest.fixture def existing_document(self): """创建现有文档""" return Document( id=EntityId("doc_123"), content="Test document", embedding=Vector([1.0, 2.0, 3.0]), metadata={"source": "test"}, created_at=Timestamp.now(), updated_at=Timestamp.now() ) @pytest.mark.asyncio async def test_handle_gets_document_successfully( self, handler, mock_repository, existing_document ): """测试成功获取文档""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=existing_document) query = GetDocumentQuery(document_id="doc_123") # Act result = await handler.handle(query) # Assert assert result.id == "doc_123" assert result.content == "Test document" assert result.metadata == {"source": "test"} mock_repository.find_by_id.assert_called_once() @pytest.mark.asyncio async def test_handle_raises_not_found_when_document_does_not_exist( self, handler, mock_repository ): """测试文档不存在时抛出 ResourceNotFoundException""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=None) query = GetDocumentQuery(document_id="nonexistent_doc") # Act & Assert with pytest.raises(ResourceNotFoundException) as exc_info: await handler.handle(query) assert "Document" in str(exc_info.value) assert "nonexistent_doc" in str(exc_info.value) @pytest.mark.asyncio async def test_handle_with_include_embedding( self, handler, mock_repository, existing_document ): """测试包含嵌入向量的获取""" # Arrange mock_repository.find_by_id = AsyncMock(return_value=existing_document) query = GetDocumentQuery( document_id="doc_123", include_embedding=True ) # Act result = await handler.handle(query) # Assert assert result.embedding is not None assert result.embedding == [1.0, 2.0, 3.0]