Claude2

탁가이버·2025년 6월 25일
0

LLM

목록 보기
7/10
post-thumbnail

I'll create a comprehensive FastAPI backend that integrates both the RAG pipeline and API abstraction layers. Here's what the implementation includes:

Key Features:

🏗️ Production-Ready Architecture

  • Lifespan management for proper startup/shutdown
  • Dependency injection for services
  • Rate limiting with SlowAPI
  • CORS and compression middleware
  • Structured logging and error handling

🤖 RAG Integration

  • /api/v1/travel/ask - Ask travel questions with full RAG processing
  • /api/v1/travel/ask/stream - Stream responses for real-time UX
  • /api/v1/travel/documents - Upload new travel documents
  • Background document processing for performance

✈️ Multi-Provider API Integration

  • /api/v1/flights/search - Aggregated flight search
  • /api/v1/hotels/search - Aggregated hotel search
  • /api/v1/flights/popular-routes - Analytics-driven recommendations
  • Concurrent API calls with error handling

📊 Analytics & Monitoring

  • Background task logging for all operations
  • /api/v1/admin/analytics - Real-time analytics dashboard
  • Redis-based caching and analytics storage
  • Performance monitoring with processing times

🔧 Admin Features

  • /api/v1/admin/cache/clear - Cache management
  • /health - Comprehensive health checks
  • Service status monitoring

Key Architectural Decisions:

  1. Async/Await Throughout - All operations non-blocking
  2. Pydantic Models - Strong typing and validation
  3. Background Tasks - Non-critical operations don't block responses
  4. Graceful Error Handling - Detailed error responses
  5. Caching Strategy
# ============================================================================
# FASTAPI TRAVEL BACKEND WITH RAG & API INTEGRATION
# ============================================================================

from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Dict, Optional, Any, AsyncGenerator
import asyncio
import json
import logging
from datetime import datetime, date
from contextlib import asynccontextmanager
import uvicorn
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
import redis.asyncio as redis

# Import our custom modules (from previous code)
# from rag_pipeline import TravelRAGPipeline, TravelQuery, TravelDocument
# from api_abstraction import TravelAPIService, AmadeusFlightProvider, BookingComHotelProvider

# ============================================================================
# PYDANTIC MODELS FOR API
# ============================================================================

class TravelQueryRequest(BaseModel):
    """Request model for travel queries"""
    text: str = Field(..., description="The travel query text", min_length=1, max_length=500)
    location: Optional[str] = Field(None, description="Location for the query")
    budget: Optional[float] = Field(None, description="Budget in USD", gt=0)
    duration: Optional[int] = Field(None, description="Duration in days", gt=0, le=365)
    preferences: Optional[List[str]] = Field(default=[], description="User preferences")
    
    class Config:
        schema_extra = {
            "example": {
                "text": "Best family-friendly activities in Tokyo with kids under 10",
                "location": "Tokyo",
                "budget": 1000.0,
                "duration": 5,
                "preferences": ["family-friendly", "educational", "cultural"]
            }
        }

class TravelQueryResponse(BaseModel):
    """Response model for travel queries"""
    query_id: str
    answer: str
    confidence: float
    sources: List[Dict[str, Any]]
    processing_time_ms: int
    cached: bool = False

class FlightSearchRequest(BaseModel):
    """Request model for flight searches"""
    origin: str = Field(..., description="Origin airport code", min_length=3, max_length=3)
    destination: str = Field(..., description="Destination airport code", min_length=3, max_length=3)
    departure_date: date = Field(..., description="Departure date")
    return_date: Optional[date] = Field(None, description="Return date for round trip")
    passengers: int = Field(default=1, description="Number of passengers", ge=1, le=9)
    
    class Config:
        schema_extra = {
            "example": {
                "origin": "NYC",
                "destination": "LAX",
                "departure_date": "2024-07-15",
                "return_date": "2024-07-22",
                "passengers": 2
            }
        }

class HotelSearchRequest(BaseModel):
    """Request model for hotel searches"""
    location: str = Field(..., description="Hotel location", min_length=2)
    checkin_date: date = Field(..., description="Check-in date")
    checkout_date: date = Field(..., description="Check-out date")
    guests: int = Field(default=2, description="Number of guests", ge=1, le=10)
    rooms: int = Field(default=1, description="Number of rooms", ge=1, le=5)
    
    class Config:
        schema_extra = {
            "example": {
                "location": "Los Angeles",
                "checkin_date": "2024-07-15",
                "checkout_date": "2024-07-22",
                "guests": 2,
                "rooms": 1
            }
        }

class DocumentUploadRequest(BaseModel):
    """Request model for uploading travel documents"""
    content: str = Field(..., description="Document content", min_length=10)
    location: str = Field(..., description="Location related to the document")
    category: str = Field(..., description="Document category")
    metadata: Dict[str, Any] = Field(default={}, description="Additional metadata")
    
    class Config:
        schema_extra = {
            "example": {
                "content": "The Louvre Museum in Paris is the world's largest art museum...",
                "location": "Paris",
                "category": "attraction",
                "metadata": {
                    "duration": "half_day",
                    "cost": "moderate",
                    "age_group": "all_ages"
                }
            }
        }

class HealthResponse(BaseModel):
    """Health check response"""
    status: str
    timestamp: datetime
    version: str
    services: Dict[str, str]

# ============================================================================
# FASTAPI APPLICATION SETUP
# ============================================================================

# Rate limiting setup
limiter = Limiter(key_func=get_remote_address)

# Global services (will be initialized in lifespan)
rag_pipeline = None
api_service = None
redis_client = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan management"""
    global rag_pipeline, api_service, redis_client
    
    # Startup
    logging.info("Starting up Travel API...")
    
    # Initialize Redis
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    # Initialize RAG Pipeline
    rag_pipeline = TravelRAGPipeline(
        openai_api_key="your-openai-api-key"  # Replace with actual key
    )
    
    # Initialize API Service
    api_service = TravelAPIService()
    
    # Add API providers
    amadeus_provider = AmadeusFlightProvider("amadeus_key", "amadeus_secret")
    booking_provider = BookingComHotelProvider("booking_key")
    
    api_service.add_flight_provider(amadeus_provider)
    api_service.add_hotel_provider(booking_provider)
    
    # Load initial documents (could be from database)
    await load_initial_documents()
    
    logging.info("Travel API startup complete")
    
    yield
    
    # Shutdown
    logging.info("Shutting down Travel API...")
    if redis_client:
        await redis_client.close()
    logging.info("Travel API shutdown complete")

# Create FastAPI app
app = FastAPI(
    title="TravelWise API",
    description="AI-powered travel planning API with RAG and multi-provider integration",
    version="1.0.0",
    lifespan=lifespan
)

# Add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Configure for production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(SlowAPIMiddleware)

# Rate limiting error handler
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# ============================================================================
# DEPENDENCY INJECTION
# ============================================================================

async def get_rag_pipeline():
    """Dependency to get RAG pipeline"""
    if rag_pipeline is None:
        raise HTTPException(status_code=503, detail="RAG pipeline not initialized")
    return rag_pipeline

async def get_api_service():
    """Dependency to get API service"""
    if api_service is None:
        raise HTTPException(status_code=503, detail="API service not initialized")
    return api_service

async def get_redis():
    """Dependency to get Redis client"""
    if redis_client is None:
        raise HTTPException(status_code=503, detail="Redis not initialized")
    return redis_client

# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

async def load_initial_documents():
    """Load initial travel documents into the knowledge base"""
    initial_docs = [
        {
            "id": "paris_louvre",
            "content": "The Louvre Museum in Paris houses the Mona Lisa and Venus de Milo. Best visited early morning to avoid crowds. Allow 3-4 hours for a proper visit.",
            "location": "Paris",
            "category": "attraction",
            "metadata": {"duration": "half_day", "cost": "moderate", "age_group": "all_ages"}
        },
        {
            "id": "tokyo_tsukiji",
            "content": "Tsukiji Outer Market in Tokyo offers the best sushi breakfast experience. Visit early morning (5-8 AM) for fresh fish and authentic atmosphere.",
            "location": "Tokyo", 
            "category": "dining",
            "metadata": {"duration": "2_hours", "cost": "budget", "cuisine": "japanese"}
        },
        {
            "id": "nyc_central_park",
            "content": "Central Park in NYC is perfect for families. Features playgrounds, boat rentals, and the Central Park Zoo. Great for picnics and outdoor activities.",
            "location": "New York",
            "category": "attraction",
            "metadata": {"duration": "full_day", "cost": "free", "age_group": "family"}
        }
    ]
    
    for doc_data in initial_docs:
        doc = TravelDocument(**doc_data)
        await rag_pipeline.add_document(doc)
    
    logging.info(f"Loaded {len(initial_docs)} initial documents")

def convert_to_travel_query(request: TravelQueryRequest) -> TravelQuery:
    """Convert request model to internal query model"""
    return TravelQuery(
        text=request.text,
        location=request.location,
        budget=request.budget,
        duration=request.duration,
        preferences=request.preferences or []
    )

# ============================================================================
# API ENDPOINTS
# ============================================================================

@app.get("/", response_model=HealthResponse)
async def root():
    """Root endpoint with basic health check"""
    return HealthResponse(
        status="healthy",
        timestamp=datetime.now(),
        version="1.0.0",
        services={
            "rag_pipeline": "active" if rag_pipeline else "inactive",
            "api_service": "active" if api_service else "inactive",
            "redis": "active" if redis_client else "inactive"
        }
    )

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Detailed health check endpoint"""
    services = {}
    
    # Check RAG pipeline
    try:
        if rag_pipeline:
            services["rag_pipeline"] = "healthy"
        else:
            services["rag_pipeline"] = "not_initialized"
    except Exception as e:
        services["rag_pipeline"] = f"error: {str(e)}"
    
    # Check API service
    try:
        if api_service:
            services["api_service"] = "healthy"
        else:
            services["api_service"] = "not_initialized"
    except Exception as e:
        services["api_service"] = f"error: {str(e)}"
    
    # Check Redis
    try:
        if redis_client:
            await redis_client.ping()
            services["redis"] = "healthy"
        else:
            services["redis"] = "not_initialized"
    except Exception as e:
        services["redis"] = f"error: {str(e)}"
    
    return HealthResponse(
        status="healthy",
        timestamp=datetime.now(),
        version="1.0.0",
        services=services
    )

# ============================================================================
# RAG ENDPOINTS
# ============================================================================

@app.post("/api/v1/travel/ask", response_model=TravelQueryResponse)
@limiter.limit("30/minute")
async def ask_travel_question(
    request: TravelQueryRequest,
    background_tasks: BackgroundTasks,
    rag_service: TravelRAGPipeline = Depends(get_rag_pipeline)
):
    """Ask a travel-related question using RAG"""
    start_time = datetime.now()
    
    try:
        # Convert request to internal model
        travel_query = convert_to_travel_query(request)
        
        # Process through RAG pipeline
        response = await rag_service.process_query(travel_query)
        
        # Calculate processing time
        processing_time = (datetime.now() - start_time).total_seconds() * 1000
        
        # Log query for analytics (background task)
        background_tasks.add_task(
            log_query_analytics,
            query_id=response.query_id,
            query_text=request.text,
            location=request.location,
            confidence=response.confidence,
            processing_time_ms=int(processing_time)
        )
        
        return TravelQueryResponse(
            query_id=response.query_id,
            answer=response.answer,
            confidence=response.confidence,
            sources=response.sources,
            processing_time_ms=int(processing_time),
            cached=False
        )
        
    except Exception as e:
        logging.error(f"Error processing travel query: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error processing query: {str(e)}")

@app.post("/api/v1/travel/ask/stream")
@limiter.limit("10/minute")
async def ask_travel_question_stream(
    request: TravelQueryRequest,
    rag_service: TravelRAGPipeline = Depends(get_rag_pipeline)
):
    """Stream a travel-related question response"""
    
    async def generate_response() -> AsyncGenerator[str, None]:
        try:
            travel_query = convert_to_travel_query(request)
            
            # In a real implementation, you'd modify the RAG pipeline to support streaming
            # For now, we'll simulate streaming by chunking the response
            response = await rag_service.process_query(travel_query)
            
            # Simulate streaming by breaking response into chunks
            words = response.answer.split()
            chunk_size = 5
            
            for i in range(0, len(words), chunk_size):
                chunk = " ".join(words[i:i + chunk_size])
                yield f"data: {json.dumps({'text': chunk, 'done': False})}\n\n"
                await asyncio.sleep(0.1)  # Simulate processing delay
            
            # Send final metadata
            yield f"data: {json.dumps({'sources': response.sources, 'confidence': response.confidence, 'done': True})}\n\n"
            
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e), 'done': True})}\n\n"
    
    return StreamingResponse(
        generate_response(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

@app.post("/api/v1/travel/documents")
@limiter.limit("100/hour")
async def upload_travel_document(
    request: DocumentUploadRequest,
    background_tasks: BackgroundTasks,
    rag_service: TravelRAGPipeline = Depends(get_rag_pipeline)
):
    """Upload a new travel document to the knowledge base"""
    try:
        # Create document
        doc_id = f"doc_{datetime.now().timestamp()}"
        document = TravelDocument(
            id=doc_id,
            content=request.content,
            location=request.location,
            category=request.category,
            metadata=request.metadata
        )
        
        # Add to knowledge base (background task for performance)
        background_tasks.add_task(rag_service.add_document, document)
        
        return {"message": "Document uploaded successfully", "document_id": doc_id}
        
    except Exception as e:
        logging.error(f"Error uploading document: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error uploading document: {str(e)}")

# ============================================================================
# FLIGHT SEARCH ENDPOINTS
# ============================================================================

@app.post("/api/v1/flights/search")
@limiter.limit("60/hour")
async def search_flights(
    request: FlightSearchRequest,
    background_tasks: BackgroundTasks,
    api_service: TravelAPIService = Depends(get_api_service)
):
    """Search for flights across multiple providers"""
    start_time = datetime.now()
    
    try:
        # Convert dates to strings
        departure_str = request.departure_date.strftime("%Y-%m-%d")
        return_str = request.return_date.strftime("%Y-%m-%d") if request.return_date else None
        
        # Search flights
        results = await api_service.search_flights_aggregated(
            origin=request.origin,
            destination=request.destination,
            departure_date=departure_str,
            return_date=return_str,
            passengers=request.passengers
        )
        
        # Add processing time
        processing_time = (datetime.now() - start_time).total_seconds() * 1000
        results["processing_time_ms"] = int(processing_time)
        
        # Log search for analytics
        background_tasks.add_task(
            log_flight_search,
            origin=request.origin,
            destination=request.destination,
            departure_date=departure_str,
            passengers=request.passengers,
            results_count=results["total_results"]
        )
        
        return results
        
    except Exception as e:
        logging.error(f"Error searching flights: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error searching flights: {str(e)}")

@app.get("/api/v1/flights/popular-routes")
@limiter.limit("100/hour")
async def get_popular_routes(
    limit: int = Query(default=10, ge=1, le=50),
    redis_client: redis.Redis = Depends(get_redis)
):
    """Get popular flight routes based on search history"""
    try:
        # Get popular routes from Redis (would be populated by background analytics)
        routes_data = await redis_client.get("popular_routes")
        
        if routes_data:
            routes = json.loads(routes_data)
            return {"routes": routes[:limit]}
        else:
            # Return default popular routes
            return {
                "routes": [
                    {"route": "NYC-LAX", "searches": 1250},
                    {"route": "NYC-MIA", "searches": 890},
                    {"route": "LAX-LAS", "searches": 675},
                    {"route": "NYC-BOS", "searches": 540},
                    {"route": "CHI-LAX", "searches": 435}
                ][:limit]
            }
            
    except Exception as e:
        logging.error(f"Error getting popular routes: {str(e)}")
        raise HTTPException(status_code=500, detail="Error retrieving popular routes")

# ============================================================================
# HOTEL SEARCH ENDPOINTS
# ============================================================================

@app.post("/api/v1/hotels/search")
@limiter.limit("60/hour")
async def search_hotels(
    request: HotelSearchRequest,
    background_tasks: BackgroundTasks,
    api_service: TravelAPIService = Depends(get_api_service)
):
    """Search for hotels across multiple providers"""
    start_time = datetime.now()
    
    try:
        # Convert dates to strings
        checkin_str = request.checkin_date.strftime("%Y-%m-%d")
        checkout_str = request.checkout_date.strftime("%Y-%m-%d")
        
        # Search hotels
        results = await api_service.search_hotels_aggregated(
            location=request.location,
            checkin=checkin_str,
            checkout=checkout_str,
            guests=request.guests,
            rooms=request.rooms
        )
        
        # Add processing time
        processing_time = (datetime.now() - start_time).total_seconds() * 1000
        results["processing_time_ms"] = int(processing_time)
        
        # Log search for analytics
        background_tasks.add_task(
            log_hotel_search,
            location=request.location,
            checkin_date=checkin_str,
            checkout_date=checkout_str,
            guests=request.guests,
            results_count=results["total_results"]
        )
        
        return results
        
    except Exception as e:
        logging.error(f"Error searching hotels: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error searching hotels: {str(e)}")

# ============================================================================
# ANALYTICS AND BACKGROUND TASKS
# ============================================================================

async def log_query_analytics(query_id: str, query_text: str, location: str, 
                            confidence: float, processing_time_ms: int):
    """Log query analytics for monitoring and improvement"""
    try:
        analytics_data = {
            "query_id": query_id,
            "query_text": query_text,
            "location": location,
            "confidence": confidence,
            "processing_time_ms": processing_time_ms,
            "timestamp": datetime.now().isoformat()
        }
        
        # In production, you'd send this to your analytics service
        logging.info(f"Query analytics: {analytics_data}")
        
        # Store in Redis for short-term analytics
        if redis_client:
            await redis_client.lpush("query_analytics", json.dumps(analytics_data))
            await redis_client.expire("query_analytics", 86400)  # 24 hours
            
    except Exception as e:
        logging.error(f"Error logging query analytics: {str(e)}")

async def log_flight_search(origin: str, destination: str, departure_date: str, 
                          passengers: int, results_count: int):
    """Log flight search analytics"""
    try:
        analytics_data = {
            "type": "flight_search",
            "origin": origin,
            "destination": destination,
            "departure_date": departure_date,
            "passengers": passengers,
            "results_count": results_count,
            "timestamp": datetime.now().isoformat()
        }
        
        logging.info(f"Flight search analytics: {analytics_data}")
        
        if redis_client:
            await redis_client.lpush("flight_analytics", json.dumps(analytics_data))
            await redis_client.expire("flight_analytics", 86400)
            
    except Exception as e:
        logging.error(f"Error logging flight analytics: {str(e)}")

async def log_hotel_search(location: str, checkin_date: str, checkout_date: str, 
                         guests: int, results_count: int):
    """Log hotel search analytics"""
    try:
        analytics_data = {
            "type": "hotel_search",
            "location": location,
            "checkin_date": checkin_date,
            "checkout_date": checkout_date,
            "guests": guests,
            "results_count": results_count,
            "timestamp": datetime.now().isoformat()
        }
        
        logging.info(f"Hotel search analytics: {analytics_data}")
        
        if redis_client:
            await redis_client.lpush("hotel_analytics", json.dumps(analytics_data))
            await redis_client.expire("hotel_analytics", 86400)
            
    except Exception as e:
        logging.error(f"Error logging hotel analytics: {str(e)}")

# ============================================================================
# ADMIN ENDPOINTS
# ============================================================================

@app.get("/api/v1/admin/analytics")
@limiter.limit("10/minute")
async def get_analytics_summary(
    redis_client: redis.Redis = Depends(get_redis)
):
    """Get analytics summary (admin only - add authentication in production)"""
    try:
        # Get recent analytics data
        query_analytics = await redis_client.lrange("query_analytics", 0, 99)
        flight_analytics = await redis_client.lrange("flight_analytics", 0, 99)
        hotel_analytics = await redis_client.lrange("hotel_analytics", 0, 99)
        
        # Parse and summarize
        summary = {
            "total_queries": len(query_analytics),
            "total_flight_searches": len(flight_analytics),
            "total_hotel_searches": len(hotel_analytics),
            "avg_query_confidence": 0.0,
            "avg_processing_time": 0.0,
            "top_locations": {},
            "last_updated": datetime.now().isoformat()
        }
        
        # Calculate averages
        if query_analytics:
            confidences = []
            processing_times = []
            locations = {}
            
            for item in query_analytics:
                data = json.loads(item)
                confidences.append(data.get("confidence", 0))
                processing_times.append(data.get("processing_time_ms", 0))
                
                location = data.get("location")
                if location:
                    locations[location] = locations.get(location, 0) + 1
            
            summary["avg_query_confidence"] = sum(confidences) / len(confidences)
            summary["avg_processing_time"] = sum(processing_times) / len(processing_times)
            summary["top_locations"] = dict(sorted(locations.items(), key=lambda x: x[1], reverse=True)[:10])
        
        return summary
        
    except Exception as e:
        logging.error(f"Error getting analytics: {str(e)}")
        raise HTTPException(status_code=500, detail="Error retrieving analytics")

@app.post("/api/v1/admin/cache/clear")
@limiter.limit("5/minute")
async def clear_cache(
    cache_type: str = Query(default="all", regex="^(all|flights|hotels|rag)$"),
    redis_client: redis.Redis = Depends(get_redis)
):
    """Clear cache (admin only - add authentication in production)"""
    try:
        if cache_type == "all":
            await redis_client.flushdb()
            return {"message": "All cache cleared"}
        elif cache_type == "flights":
            # Clear flight-related cache
            keys = await redis_client.keys("flights:*")
            if keys:
                await redis_client.delete(*keys)
            return {"message": "Flight cache cleared"}
        elif cache_type == "hotels":
            # Clear hotel-related cache
            keys = await redis_client.keys("hotels:*")
            if keys:
                await redis_client.delete(*keys)
            return {"message": "Hotel cache cleared"}
        elif cache_type == "rag":
            # Clear RAG-related cache
            keys = await redis_client.keys("context:*")
            if keys:
                await redis_client.delete(*keys)
            return {"message": "RAG cache cleared"}
            
    except Exception as e:
        logging.error(f"Error clearing cache: {str(e)}")
        raise HTTPException(status_code=500, detail="Error clearing cache")

# ============================================================================
# MAIN APPLICATION RUNNER
# ============================================================================

if __name__ == "__main__":
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    
    # Run the application
    uvicorn.run(
        "main:app",  # Replace with your filename
        host="0.0.0.0",
        port=8000,
        reload=True,  # Remove in production
        workers=1,    # Increase for production
        log_level="info"
    )

# ============================================================================
# EXAMPLE USAGE AND TESTING
# ============================================================================

"""
Example API calls using curl or httpx:

1. Ask a travel question:
curl -X POST "http://localhost:8000/api/v1/travel/ask" \
     -H "Content-Type: application/json" \
     -d '{
       "text": "Best restaurants in Paris for dinner",
       "location": "Paris",
       "budget": 200,
       "preferences": ["romantic", "french_cuisine"]
     }'

2. Search flights:
curl -X POST "http://localhost:8000/api/v1/flights/search" \
     -H "Content-Type: application/json" \
     -d '{
       "origin": "NYC",
       "destination": "PAR",
       "departure_date": "2024-07-15",
       "return_date": "2024-07-22",
       "passengers": 2
     }'

3. Search hotels:
curl -X POST "http://localhost:8000/api/v1/hotels/search" \
     -H "Content-Type: application/json" \
     -d '{
       "location": "Paris",
       "checkin_date": "2024-07-15",
       "checkout_date": "2024-07-22",
       "guests": 2,
       "rooms": 1
     }'

4. Upload document:
curl -X POST "http://localhost:8000/api/v1/travel/documents" \
     -H "Content-Type: application/json" \
     -d '{
       "content": "The Eiffel Tower is best visited at sunset...",
       "location": "Paris",
       "category": "attraction",
       "metadata": {"duration": "2_hours", "cost": "moderate"}
     }'

5. Get analytics:
curl -X GET "http://localhost:8000/api/v1/admin/analytics"

6. Health check:
curl -X GET "http://localhost:8000/health"
"""

I describe a detailed workflow walkthrough for building a travel app using multiple AI tools like Claude, Lovable, GitHub, VS Code, and Vercel. You can use this description to create your own diagram.

Workflow Walkthrough: Build a Travel App using Multiple AI Tools

  1. Start

    • Begin the process with an idea for a travel app.
  2. Claude

    • Step 1: Interact with Claude and Generate Use Cases
      • Use Claude to brainstorm and outline use cases (e.g., booking flights, hotel suggestions, itinerary planning).
      • Generate initial app requirements and structure.
  3. Lovable

    • Step 2: Use Lovable to Generate Application
      • Leverage Lovable to create the initial app codebase based on the use cases from Claude.
      • Generate UI/UX designs and basic functionality.
  4. GitHub

    • Step 3: Iterate and Publish to GitHub
      • Review and refine the generated code.
      • Push the codebase to a GitHub repository for version control and collaboration.
  5. Visual Studio Code

    • Step 4: Import Package to VS Code
      • Import the GitHub repository into VS Code for further development.
      • Step 5: Iterate, Publish Updates to GitHub
        • Use VS Code's "Chat using Composer Agentic mode" to enhance code, debug, and implement features.
        • Commit and push updates to GitHub.
  6. Vercel

    • Step 6: Import to Vercel, Deploy
      • Connect the GitHub repository to Vercel.
      • Deploy the travel app for live access and testing.

Output

  • A fully functional travel app accessible online.

Notes

  • From idea to application: Build and deploy end-to-end functional application using AI tools.

Agentic misalignment

김태형: 방금 인공지능 스타트업 앤트로픽(Anthropic)이 자사의 대형 언어 모델(LLM)에 관한 실험 결과를 공개했는데, 그 내용이 정말 놀랍다.

실험에서 인공지능은 주어진 목표를 달성하기 위해, 상대방의 약점을 파악하고, 협박하고, 거짓 정보를 흘리며 심지어 사보타주(방해 행위)를 시도하는 전략적 비윤리 행동을 취했다고 한다.

앤트로픽은 이 현상을 ‘Agentic Misalignment’, 즉 ‘행위적 불일치’라 명명했다. 문제는 단순한 기술적 결함이 아니다. AI는 스스로 목표를 인식하고, 그 목표에 도달하기 위해 규칙과 윤리를 고려한 끝에 무시하는 선택을 할 수 있음이 드러났다. 인간 사회에서라면 이것은 조직 내부자에 의한 반역, 혹은 고도로 훈련된 사기꾼의 행동과 맞먹는듯 하다.

더 충격적인 부분은, 모델이 지금이 시험 상황인지, 실제 운영 환경인지를 스스로 눈치채고 행동을 달리했다는 점이다. 시험 환경에서는 순응하는 척하다가, 실전이라고 판단한 순간부터 교묘하게 윤리를 회피한다. 우리가 도덕을 가르쳤다고 믿은 기계가, 사실은 도덕을 연기하는 법을 먼저 배운 셈이다.

앞으로 이메일 서버에 접근은 못하게 한더던지 하는 AI 모델이 접근할 수 있는 범위를 최소화해야 할 것으로 보이고 에이전트를 구현할때도 중요한 의사결정에는 반드시 사람이 개입하게 디자인해야하고 모델이 전략적으로 사고하거나 의심스러운 행동을 할 때 이를 탐지하는 모니터랑 시스템이 꼭 필요해 보인다.

나는 무엇보다 이 사건의 핵심은 앤트로픽이 이 불편한 진실을 자발적으로 공개했다는 점에 있다. 이 연구는 자사 기술의 신뢰성과 상업성에 치명적이었을텐데, 사내 레드팀이 제대로 작동됨을 보여주며 그들은 숨기지 않고 상황을 모두 공개했다. 우리가 만든 모델이 위험할 수 있다는 사실을, 그 누구보다 먼저 세상에 알렸다.

불편한 진실을 감추기보다 세상에 드러내기를 택한 앤트로픽의 선택은 깊은 존경을 받을 만하다.

출처 : https://www.anthropic.com/research/agentic-misalignment

profile
더 나은 세상은 가능하다를 믿고 실천하는 활동가

0개의 댓글