I've created a comprehensive refactoring plan for a travel application called "TravelWise" focused on API integration, RAG, and database scalability for a travel application. This will include specific use cases and actionable refactoring strategies. The plan includes:
Key highlights:
The plan addresses common scalability challenges like API rate limiting, database bottlenecks, and complex travel queries while building a robust foundation for learning these technologies.
Core Features:
Frontend → API Gateway → Microservices → External APIs
Actions:
Flight APIs (Amadeus, Skyscanner):
Hotel APIs (Booking.com, Expedia):
Maps & Places (Google Maps, Foursquare):
Scenario: User asks "Best family-friendly activities in Seoul with kids under 10"
RAG Components:
Scenario: "Plan a 5-day cultural trip to Rome under $2000"
Implementation:
User Query → Embedding → Vector Search → Context Retrieval → LLM → Structured Itinerary
Scenario: "What's the current situation at JFK airport?"
Data Sources:
Travel Content Sources:
Vector Storage:
# Example pipeline structure
Travel Content → Text Chunking → Embedding Model → Vector DB
Content Processing:
Primary Databases:
PostgreSQL (ACID transactions) → User accounts, bookings, payments
MongoDB (Document store) → Travel content, reviews, itineraries
Redis (Cache/Session) → Search results, user sessions, rate limiting
Elasticsearch → Full-text search, analytics, recommendations
Vector DB → RAG embeddings, similarity search
Read Scaling:
Write Scaling:
High Availability:
Data Consistency:
I've created code examples for both the RAG pipeline and API abstraction layers for the travel application. Here's what the code includes:
The code demonstrates production-ready patterns like circuit breakers, proper async/await usage, and comprehensive error handling. You can run the example at the bottom to see both systems working together!
# ============================================================================
# RAG PIPELINE IMPLEMENTATION
# ============================================================================
import asyncio
import json
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
from sentence_transformers import SentenceTransformer
import openai
from pinecone import Pinecone
import redis
# Data Models
@dataclass
class TravelDocument:
id: str
content: str
location: str
category: str # attraction, restaurant, hotel, activity
metadata: Dict[str, Any]
embedding: Optional[List[float]] = None
@dataclass
class TravelQuery:
text: str
location: Optional[str] = None
budget: Optional[float] = None
duration: Optional[int] = None
preferences: List[str] = None
@dataclass
class RAGResponse:
answer: str
sources: List[Dict[str, Any]]
confidence: float
query_id: str
# ============================================================================
# RAG PIPELINE COMPONENTS
# ============================================================================
class TravelEmbeddingService:
"""Handles document embedding and semantic search"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.pinecone = Pinecone(api_key="your-pinecone-key")
self.index = self.pinecone.Index("travel-knowledge")
async def embed_document(self, document: TravelDocument) -> TravelDocument:
"""Generate embeddings for a travel document"""
# Combine content with location and category for better context
combined_text = f"{document.content} Location: {document.location} Category: {document.category}"
embedding = self.model.encode(combined_text)
document.embedding = embedding.tolist()
return document
async def store_document(self, document: TravelDocument):
"""Store document in vector database"""
if not document.embedding:
document = await self.embed_document(document)
# Prepare metadata for Pinecone
metadata = {
"content": document.content,
"location": document.location,
"category": document.category,
**document.metadata
}
# Upsert to Pinecone
self.index.upsert([(document.id, document.embedding, metadata)])
async def semantic_search(self, query: TravelQuery, top_k: int = 5) -> List[Dict]:
"""Perform semantic search for relevant documents"""
# Create enhanced query text
query_text = query.text
if query.location:
query_text += f" in {query.location}"
if query.preferences:
query_text += f" preferences: {', '.join(query.preferences)}"
# Generate query embedding
query_embedding = self.model.encode(query_text).tolist()
# Search in Pinecone with filters
filter_dict = {}
if query.location:
filter_dict["location"] = {"$eq": query.location}
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True,
filter=filter_dict if filter_dict else None
)
return results.matches
class TravelContextRetriever:
"""Retrieves and processes relevant context for queries"""
def __init__(self, embedding_service: TravelEmbeddingService):
self.embedding_service = embedding_service
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
async def retrieve_context(self, query: TravelQuery) -> Dict[str, Any]:
"""Retrieve relevant context for a travel query"""
# Check cache first
cache_key = f"context:{hash(query.text + str(query.location))}"
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Perform semantic search
search_results = await self.embedding_service.semantic_search(query)
# Process and rank results
context = {
"primary_sources": [],
"related_info": [],
"location_data": {},
"query_intent": await self._classify_query_intent(query)
}
for match in search_results:
source_info = {
"content": match.metadata["content"],
"location": match.metadata["location"],
"category": match.metadata["category"],
"score": match.score,
"source_id": match.id
}
if match.score > 0.8:
context["primary_sources"].append(source_info)
else:
context["related_info"].append(source_info)
# Add real-time data if available
if query.location:
context["location_data"] = await self._get_realtime_location_data(query.location)
# Cache the result
self.redis_client.setex(cache_key, 300, json.dumps(context, default=str))
return context
async def _classify_query_intent(self, query: TravelQuery) -> str:
"""Classify the intent of the travel query"""
text_lower = query.text.lower()
if any(word in text_lower for word in ["restaurant", "food", "eat", "dining"]):
return "dining"
elif any(word in text_lower for word in ["hotel", "stay", "accommodation"]):
return "accommodation"
elif any(word in text_lower for word in ["activity", "things to do", "attractions"]):
return "activities"
elif any(word in text_lower for word in ["flight", "airline", "book"]):
return "booking"
else:
return "general"
async def _get_realtime_location_data(self, location: str) -> Dict:
"""Get real-time data for a location (weather, events, etc.)"""
# This would integrate with weather APIs, event APIs, etc.
return {
"weather": "sunny, 25°C",
"current_events": ["Music festival this weekend"],
"crowd_levels": "moderate",
"last_updated": datetime.now().isoformat()
}
class TravelResponseGenerator:
"""Generates contextual responses using LLM"""
def __init__(self, api_key: str):
openai.api_key = api_key
async def generate_response(self, query: TravelQuery, context: Dict[str, Any]) -> RAGResponse:
"""Generate a comprehensive travel response"""
# Build prompt with context
prompt = self._build_prompt(query, context)
try:
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a knowledgeable travel assistant. Provide helpful, accurate, and personalized travel recommendations."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
answer = response.choices[0].message.content
# Extract sources from context
sources = []
for source in context.get("primary_sources", []):
sources.append({
"content_preview": source["content"][:200] + "...",
"location": source["location"],
"category": source["category"],
"relevance_score": source["score"]
})
return RAGResponse(
answer=answer,
sources=sources,
confidence=self._calculate_confidence(context),
query_id=f"q_{datetime.now().timestamp()}"
)
except Exception as e:
return RAGResponse(
answer=f"I apologize, but I encountered an error processing your request: {str(e)}",
sources=[],
confidence=0.0,
query_id=f"error_{datetime.now().timestamp()}"
)
def _build_prompt(self, query: TravelQuery, context: Dict[str, Any]) -> str:
"""Build a comprehensive prompt for the LLM"""
prompt_parts = [
f"Travel Query: {query.text}",
f"Location: {query.location or 'Not specified'}",
f"Budget: ${query.budget or 'Not specified'}",
f"Duration: {query.duration or 'Not specified'} days",
f"Preferences: {', '.join(query.preferences) if query.preferences else 'None specified'}",
"",
"Relevant Information:",
]
# Add primary sources
for i, source in enumerate(context.get("primary_sources", [])[:3]):
prompt_parts.append(f"{i+1}. {source['content']}")
# Add real-time data
if context.get("location_data"):
prompt_parts.append(f"\nCurrent conditions: {context['location_data']}")
prompt_parts.extend([
"",
"Please provide a helpful response that:",
"1. Directly answers the user's question",
"2. Includes specific recommendations based on the provided information",
"3. Considers the user's budget and preferences",
"4. Mentions any relevant current conditions",
"5. Suggests practical next steps"
])
return "\n".join(prompt_parts)
def _calculate_confidence(self, context: Dict[str, Any]) -> float:
"""Calculate confidence score based on context quality"""
primary_sources = len(context.get("primary_sources", []))
avg_score = np.mean([s["score"] for s in context.get("primary_sources", [])] or [0])
confidence = min(1.0, (primary_sources * 0.2) + (avg_score * 0.8))
return round(confidence, 2)
# ============================================================================
# RAG PIPELINE ORCHESTRATOR
# ============================================================================
class TravelRAGPipeline:
"""Main RAG pipeline orchestrator"""
def __init__(self, openai_api_key: str):
self.embedding_service = TravelEmbeddingService()
self.context_retriever = TravelContextRetriever(self.embedding_service)
self.response_generator = TravelResponseGenerator(openai_api_key)
async def process_query(self, query: TravelQuery) -> RAGResponse:
"""Process a travel query through the complete RAG pipeline"""
# Step 1: Retrieve relevant context
context = await self.context_retriever.retrieve_context(query)
# Step 2: Generate response
response = await self.response_generator.generate_response(query, context)
return response
async def add_document(self, document: TravelDocument):
"""Add a new document to the knowledge base"""
await self.embedding_service.store_document(document)
# ============================================================================
# API ABSTRACTION LAYERS
# ============================================================================
import aiohttp
import asyncio
from abc import ABC, abstractmethod
from typing import Union
from datetime import datetime
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
# Base Classes for API Abstraction
class APIProvider(ABC):
"""Abstract base class for API providers"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
@abstractmethod
async def search(self, **kwargs):
pass
# Flight API Abstraction
class FlightProvider(APIProvider):
"""Abstract flight provider"""
@abstractmethod
async def search_flights(self, origin: str, destination: str,
departure_date: str, return_date: str = None,
passengers: int = 1) -> Dict[str, Any]:
pass
class AmadeusFlightProvider(FlightProvider):
"""Amadeus API implementation"""
def __init__(self, api_key: str, api_secret: str):
super().__init__(api_key, "https://api.amadeus.com")
self.api_secret = api_secret
self.access_token = None
async def _get_access_token(self):
"""Get OAuth token for Amadeus API"""
if self.access_token:
return self.access_token
auth_url = f"{self.base_url}/v1/security/oauth2/token"
auth_data = {
"grant_type": "client_credentials",
"client_id": self.api_key,
"client_secret": self.api_secret
}
async with self.session.post(auth_url, data=auth_data) as response:
if response.status == 200:
token_data = await response.json()
self.access_token = token_data["access_token"]
return self.access_token
else:
raise Exception(f"Failed to get access token: {response.status}")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def search_flights(self, origin: str, destination: str,
departure_date: str, return_date: str = None,
passengers: int = 1) -> Dict[str, Any]:
"""Search flights using Amadeus API"""
token = await self._get_access_token()
headers = {"Authorization": f"Bearer {token}"}
params = {
"originLocationCode": origin,
"destinationLocationCode": destination,
"departureDate": departure_date,
"adults": passengers
}
if return_date:
params["returnDate"] = return_date
search_url = f"{self.base_url}/v2/shopping/flight-offers"
async with self.session.get(search_url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
return self._normalize_amadeus_response(data)
else:
error_text = await response.text()
raise Exception(f"Amadeus API error: {response.status} - {error_text}")
def _normalize_amadeus_response(self, raw_response: Dict) -> Dict[str, Any]:
"""Normalize Amadeus response to standard format"""
flights = []
for offer in raw_response.get("data", []):
flight = {
"id": offer["id"],
"price": {
"total": float(offer["price"]["total"]),
"currency": offer["price"]["currency"]
},
"itineraries": [],
"provider": "amadeus"
}
for itinerary in offer["itineraries"]:
flight_itinerary = {
"duration": itinerary["duration"],
"segments": []
}
for segment in itinerary["segments"]:
flight_segment = {
"departure": {
"airport": segment["departure"]["iataCode"],
"time": segment["departure"]["at"]
},
"arrival": {
"airport": segment["arrival"]["iataCode"],
"time": segment["arrival"]["at"]
},
"airline": segment["carrierCode"],
"flight_number": segment["number"]
}
flight_itinerary["segments"].append(flight_segment)
flight["itineraries"].append(flight_itinerary)
flights.append(flight)
return {"flights": flights, "total_results": len(flights)}
class SkyscannerFlightProvider(FlightProvider):
"""Skyscanner API implementation"""
def __init__(self, api_key: str):
super().__init__(api_key, "https://skyscanner-skyscanner-flight-search-v1.p.rapidapi.com")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def search_flights(self, origin: str, destination: str,
departure_date: str, return_date: str = None,
passengers: int = 1) -> Dict[str, Any]:
"""Search flights using Skyscanner API"""
headers = {
"X-RapidAPI-Key": self.api_key,
"X-RapidAPI-Host": "skyscanner-skyscanner-flight-search-v1.p.rapidapi.com"
}
# Skyscanner uses different endpoint structure
search_url = f"{self.base_url}/apiservices/browsequotes/v1.0/US/USD/en-US/{origin}/{destination}/{departure_date}"
if return_date:
search_url += f"/{return_date}"
async with self.session.get(search_url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return self._normalize_skyscanner_response(data)
else:
error_text = await response.text()
raise Exception(f"Skyscanner API error: {response.status} - {error_text}")
def _normalize_skyscanner_response(self, raw_response: Dict) -> Dict[str, Any]:
"""Normalize Skyscanner response to standard format"""
# Implementation would convert Skyscanner format to standard format
# Similar to Amadeus normalization
return {"flights": [], "total_results": 0}
# Hotel API Abstraction
class HotelProvider(APIProvider):
"""Abstract hotel provider"""
@abstractmethod
async def search_hotels(self, location: str, checkin: str, checkout: str,
guests: int = 2, rooms: int = 1) -> Dict[str, Any]:
pass
class BookingComHotelProvider(HotelProvider):
"""Booking.com API implementation"""
def __init__(self, api_key: str):
super().__init__(api_key, "https://booking-com.p.rapidapi.com")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def search_hotels(self, location: str, checkin: str, checkout: str,
guests: int = 2, rooms: int = 1) -> Dict[str, Any]:
"""Search hotels using Booking.com API"""
headers = {
"X-RapidAPI-Key": self.api_key,
"X-RapidAPI-Host": "booking-com.p.rapidapi.com"
}
# First, get destination ID
dest_id = await self._get_destination_id(location, headers)
params = {
"dest_id": dest_id,
"dest_type": "city",
"checkin_date": checkin,
"checkout_date": checkout,
"adults_number": guests,
"room_number": rooms,
"units": "metric",
"order_by": "popularity"
}
search_url = f"{self.base_url}/v1/hotels/search"
async with self.session.get(search_url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
return self._normalize_booking_response(data)
else:
error_text = await response.text()
raise Exception(f"Booking.com API error: {response.status} - {error_text}")
async def _get_destination_id(self, location: str, headers: Dict) -> str:
"""Get destination ID for location"""
search_url = f"{self.base_url}/v1/hotels/locations"
params = {"name": location, "locale": "en-gb"}
async with self.session.get(search_url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
if data and len(data) > 0:
return str(data[0]["dest_id"])
raise Exception(f"Could not find destination ID for {location}")
def _normalize_booking_response(self, raw_response: Dict) -> Dict[str, Any]:
"""Normalize Booking.com response"""
hotels = []
for hotel_data in raw_response.get("result", []):
hotel = {
"id": hotel_data["hotel_id"],
"name": hotel_data["hotel_name"],
"price": hotel_data.get("min_total_price", 0),
"currency": hotel_data.get("currency_code", "USD"),
"rating": hotel_data.get("review_score", 0),
"location": {
"address": hotel_data.get("address", ""),
"city": hotel_data.get("city", ""),
"coordinates": {
"lat": hotel_data.get("latitude", 0),
"lng": hotel_data.get("longitude", 0)
}
},
"amenities": hotel_data.get("hotel_facilities", []),
"provider": "booking.com"
}
hotels.append(hotel)
return {"hotels": hotels, "total_results": len(hotels)}
# ============================================================================
# UNIFIED API SERVICE
# ============================================================================
class TravelAPIService:
"""Unified service that aggregates multiple API providers"""
def __init__(self):
self.flight_providers = []
self.hotel_providers = []
self.cache = redis.Redis(host='localhost', port=6379, db=1)
def add_flight_provider(self, provider: FlightProvider):
"""Add a flight API provider"""
self.flight_providers.append(provider)
def add_hotel_provider(self, provider: HotelProvider):
"""Add a hotel API provider"""
self.hotel_providers.append(provider)
async def search_flights_aggregated(self, origin: str, destination: str,
departure_date: str, return_date: str = None,
passengers: int = 1) -> Dict[str, Any]:
"""Search flights across all providers and aggregate results"""
# Check cache first
cache_key = f"flights:{origin}:{destination}:{departure_date}:{return_date}:{passengers}"
cached_result = self.cache.get(cache_key)
if cached_result:
return json.loads(cached_result)
all_flights = []
errors = []
# Search across all providers concurrently
tasks = []
for provider in self.flight_providers:
async with provider:
task = provider.search_flights(origin, destination, departure_date,
return_date, passengers)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append(f"Provider {i}: {str(result)}")
logging.error(f"Flight provider {i} failed: {result}")
else:
all_flights.extend(result["flights"])
# Sort by price
all_flights.sort(key=lambda x: x["price"]["total"])
aggregated_result = {
"flights": all_flights,
"total_results": len(all_flights),
"providers_used": len(self.flight_providers),
"errors": errors,
"cached": False
}
# Cache for 5 minutes
self.cache.setex(cache_key, 300, json.dumps(aggregated_result, default=str))
return aggregated_result
async def search_hotels_aggregated(self, location: str, checkin: str,
checkout: str, guests: int = 2,
rooms: int = 1) -> Dict[str, Any]:
"""Search hotels across all providers and aggregate results"""
cache_key = f"hotels:{location}:{checkin}:{checkout}:{guests}:{rooms}"
cached_result = self.cache.get(cache_key)
if cached_result:
result = json.loads(cached_result)
result["cached"] = True
return result
all_hotels = []
errors = []
# Search across all providers concurrently
tasks = []
for provider in self.hotel_providers:
async with provider:
task = provider.search_hotels(location, checkin, checkout, guests, rooms)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append(f"Provider {i}: {str(result)}")
logging.error(f"Hotel provider {i} failed: {result}")
else:
all_hotels.extend(result["hotels"])
# Remove duplicates and sort by rating
unique_hotels = {hotel["id"]: hotel for hotel in all_hotels}.values()
sorted_hotels = sorted(unique_hotels, key=lambda x: x["rating"], reverse=True)
aggregated_result = {
"hotels": list(sorted_hotels),
"total_results": len(sorted_hotels),
"providers_used": len(self.hotel_providers),
"errors": errors,
"cached": False
}
# Cache for 30 minutes
self.cache.setex(cache_key, 1800, json.dumps(aggregated_result, default=str))
return aggregated_result
# ============================================================================
# USAGE EXAMPLES
# ============================================================================
async def main():
"""Example usage of both RAG pipeline and API abstraction"""
# Initialize RAG Pipeline
rag_pipeline = TravelRAGPipeline("your-openai-api-key")
# Add some sample documents to knowledge base
sample_docs = [
TravelDocument(
id="seoul_family_1",
content="Seoul tower is perfect for families with young children. Features kid-friendly rides and stroller rentals.",
location="seoul",
category="attraction",
metadata={"age_group": "family", "duration": "full_day", "cost": "expensive"}
),
TravelDocument(
id="seoul_family_2",
content="Palaces in Seoul is great for kids under 10. Entry fee is very reasonable.",
location="Seoul",
category="attraction",
metadata={"age_group": "family", "duration": "half_day", "cost": "budget"}
)
]
# Add documents to knowledge base
for doc in sample_docs:
await rag_pipeline.add_document(doc)
# Example RAG query
travel_query = TravelQuery(
text="Best family-friendly activities in Seoul with kids under 10",
location="Seoul",
budget=500.0,
duration=3,
preferences=["family-friendly", "educational", "outdoor"]
)
# Process query through RAG pipeline
rag_response = await rag_pipeline.process_query(travel_query)
print("RAG Response:")
print(f"Answer: {rag_response.answer}")
print(f"Confidence: {rag_response.confidence}")
print(f"Sources: {len(rag_response.sources)}")
# Initialize API Service
api_service = TravelAPIService()
# Add providers (with dummy keys - replace with real ones)
amadeus_provider = AmadeusFlightProvider("amadeus_key", "amadeus_secret")
skyscanner_provider = SkyscannerFlightProvider("skyscanner_key")
booking_provider = BookingComHotelProvider("booking_key")
api_service.add_flight_provider(amadeus_provider)
api_service.add_flight_provider(skyscanner_provider)
api_service.add_hotel_provider(booking_provider)
# Example API usage
try:
# Search flights
flight_results = await api_service.search_flights_aggregated(
origin="NYC",
destination="LAX",
departure_date="2024-07-15",
return_date="2024-07-22",
passengers=2
)
print(f"\nFound {flight_results['total_results']} flights from {flight_results['providers_used']} providers")
# Search hotels
hotel_results = await api_service.search_hotels_aggregated(
location="Los Angeles",
checkin="2024-07-15",
checkout="2024-07-22",
guests=2,
rooms=1
)
print(f"Found {hotel_results['total_results']} hotels from {hotel_results['providers_used']} providers")
except Exception as e:
print(f"Error in API calls: {e}")
if __name__ == "__main__":
asyncio.run(main())