Se realiza una nueva implmentacion para la autenticacion con la api de Google.

This commit is contained in:
Mongar28
2024-11-27 02:13:30 -05:00
parent 86b3fc8d0e
commit 283da64cd3
16 changed files with 624 additions and 4640 deletions

48
app/google_auth.py Normal file
View File

@@ -0,0 +1,48 @@
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
import os.path
import json
# Si modificas estos scopes, elimina el archivo token.json
SCOPES = [
'https://www.googleapis.com/auth/calendar', # Full access to calendar
'https://www.googleapis.com/auth/gmail.readonly', # Read Gmail profile
'https://www.googleapis.com/auth/gmail.send', # Send emails
'https://www.googleapis.com/auth/calendar.events', # Manage calendar events
'https://www.googleapis.com/auth/calendar.readonly' # Read calendar events
]
def get_google_credentials(scopes=None, credentials_file='google_credentials.json'):
"""Obtiene las credenciales de Google desde el token pre-generado.
Args:
scopes: Lista opcional de scopes de Google API
credentials_file: No se usa, mantenido por compatibilidad
Returns:
Google OAuth2 credentials
"""
# Si se proporcionan scopes, usarlos; si no, usar los predeterminados
scopes_to_use = scopes if scopes else SCOPES
# Ruta al token pre-generado
token_path = os.path.join('tokens', 'gmail_token.json')
if not os.path.exists(token_path):
raise FileNotFoundError(
f"No se encontró el archivo {token_path}. "
"Ejecuta generate_token.py fuera del Docker primero."
)
try:
# Cargar credenciales desde el token
creds = Credentials.from_authorized_user_file(token_path, scopes_to_use)
# Si el token está expirado, intentar refrescarlo
if creds.expired:
creds.refresh(Request())
return creds
except Exception as e:
raise Exception(f"Error al cargar o refrescar el token: {e}")

View File

@@ -10,8 +10,9 @@ from app.rag.split_docs import load_split_docs
from app.rag.llm import load_llm_openai
from app.rag.embeddings import load_embeddins
from app.rag.retriever import create_retriever
from app.rag.vectorstore import create_verctorstore
from app.rag.vectorstore import create_vectorstore
from app.rag.rag_chain import create_rag_chain
from app.google_auth import get_google_credentials
import pytz
import telebot
import os
@@ -31,6 +32,122 @@ class LangChainTools:
)
return llm
def list_calendar_events(self, max_results: int = 50) -> list:
"""Lista los próximos eventos del calendario."""
try:
# Usar el sistema centralizado de autenticación
creds = get_google_credentials()
# Construir el servicio de Calendar
service = build('calendar', 'v3', credentials=creds)
# Obtener eventos próximos
now = datetime.now(timezone.utc).isoformat()
events_result = service.events().list(
calendarId='primary',
timeMin=now,
maxResults=max_results,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
if not events:
print('No se encontraron eventos próximos.')
return []
# Formatear la salida de eventos
formatted_events = []
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
formatted_events.append({
'start': start,
'summary': event['summary'],
'id': event['id'],
'link': event.get('htmlLink')
})
print(f"{start} - {event['summary']}")
return formatted_events
except Exception as e:
print(f"Error al listar eventos: {e}")
return {"error": str(e)}
def create_calendar_event(
self, title: str, start_time: datetime,
end_time: datetime, attendees: list = None) -> dict:
"""Crea un evento en el calendario."""
try:
# Usar el sistema centralizado de autenticación
creds = get_google_credentials()
# Construir el servicio de Calendar
service = build('calendar', 'v3', credentials=creds)
# Identificador del calendario principal
calendar_id = 'primary'
# Definir el evento
event = {
'summary': title,
'start': {
'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': 'America/Bogota',
},
'end': {
'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': 'America/Bogota',
},
}
# Agregar asistentes si se proporcionan
if attendees and isinstance(attendees, list) and len(attendees) > 0:
# Asegurarse de que cada asistente tenga un correo válido
attendees_list = []
for attendee in attendees:
if isinstance(attendee, str) and '@' in attendee:
attendees_list.append({'email': attendee.strip()})
if attendees_list:
event['attendees'] = attendees_list
# Enviar invitaciones por correo
event['sendUpdates'] = 'all'
# Crear el evento
event = service.events().insert(calendarId=calendar_id, body=event).execute()
print(f'Evento creado: {event.get("htmlLink")}')
return event
except Exception as e:
print(f"Error al crear el evento: {e}")
return {"error": str(e)}
def create_quick_add_event(self, quick_add_text: str) -> dict:
"""Crea un evento rápidamente usando texto en lenguaje natural."""
try:
# Usar el sistema centralizado de autenticación
creds = get_google_credentials()
# Construir el servicio de Calendar
service = build('calendar', 'v3', credentials=creds)
# Identificador del calendario principal
calendar_id = 'primary'
# Crear el evento usando quick add
event = service.events().quickAdd(
calendarId=calendar_id,
text=quick_add_text
).execute()
print(f'Evento creado: {event.get("htmlLink")}')
return event
except Exception as e:
print(f"Error al crear el evento rápido: {e}")
return {"error": str(e)}
@tool
def multiply(first_int: int, second_int: int) -> int:
@@ -57,232 +174,6 @@ def redact_email(topic: str) -> str:
return response
@tool
def list_calendar_events(max_results: int = 50) -> list:
"""Use this tool to list upcoming calendar events."""
# Define los alcances que necesitamos para acceder a
# la API de Google Calendar
SCOPES = ['https://www.googleapis.com/auth/calendar']
creds = None
# La ruta al archivo token.json, que contiene
# los tokens de acceso y actualización
token_path = 'token.json'
# La ruta al archivo de credenciales de OAuth 2.1
creds_path = 'credentials.json'
# Cargar las credenciales desde el archivo token.json, si existe
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# Si no hay credenciales válidas disponibles, inicia el flujo de OAuth 2.0
# para obtener nuevas credenciales
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
creds_path, SCOPES)
creds = flow.run_local_server(port=0)
# Guarda las credenciales para la próxima ejecución
with open(token_path, 'w') as token_file:
token_file.write(creds.to_json())
# Construye el objeto de servicio para interactuar
# con la API de Google Calendar
service = build('calendar', 'v3', credentials=creds)
# Identificador del calendario que deseas consultar.
# 'primary' se refiere al calendario principal del usuario.
calendar_id = 'primary'
# Realiza una llamada a la API para obtener una lista de eventos.
now = datetime.now(timezone.utc).isoformat() # 'Z' indica UTC
events_result = service.events().list(
calendarId=calendar_id, timeMin=now,
maxResults=max_results, singleEvents=True,
orderBy='startTime').execute()
# Extrae los eventos de la respuesta de la API.
events = events_result.get('items', [])
# Si no se encuentran eventos, imprime un mensaje.
if not events:
print('No upcoming events found.')
return
# Recorre la lista de eventos y muestra la hora de inicio
# y el resumen de cada evento.
for event in events:
# Obtiene la fecha y hora de inicio del evento.
# Puede ser 'dateTime' o 'date'.
start = event['start'].get('dateTime', event['start'].get('date'))
# Imprime la hora de inicio y el resumen (título) del evento.
print(start, event['summary'])
return events
@tool
def create_calendar_event(
title: str, start_time: datetime,
end_time: datetime, attendees: list) -> dict:
"""Use this tool to create an event in the calendar.
Parameters:
- title: str - The title of the event.
- start_time: datetime - The start time of the event.
- end_time: datetime - The end time of the event.
- attendees: list - A list of attendee emails (required).
Returns:
- dict - The created event details.
"""
if not attendees:
raise ValueError(
"El campo 'attendees' es obligatorio y no puede estar vacío.")
SCOPES = ['https://www.googleapis.com/auth/calendar']
creds = None
# La ruta al archivo token.json,
# que contiene los tokens de acceso y actualización
token_path = 'token_2.json'
# La ruta al archivo de credenciales de OAuth 2.0
creds_path = 'credentials_2.json'
# Cargar las credenciales desde el archivo token.json, si existe
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# Si no hay credenciales válidas disponibles,
# inicia el flujo de OAuth 2.0 para obtener nuevas credenciales
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
creds_path, SCOPES)
creds = flow.run_local_server(port=0)
# Guarda las credenciales para la próxima ejecución
with open(token_path, 'w') as token_file:
token_file.write(creds.to_json())
# Construye el objeto de servicio para
# interactuar con la API de Google Calendar
service = build('calendar', 'v3', credentials=creds)
# Validar y filtrar asistentes
valid_attendees = []
for email in attendees:
if isinstance(email, str) and '@' in email:
valid_attendees.append({'email': email})
else:
raise ValueError(f"'{email}' no es un correo electrónico válido.")
# Identificador del calendario que deseas modificar.
# 'primary' se refiere al calendario principal del usuario.
calendar_id = 'primary'
# Define el cuerpo del evento con el título,
# la hora de inicio y la hora de finalización
event = {
'summary': title,
'start': {
'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': 'America/Bogota',
},
'end': {
'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': 'America/Bogota',
},
'attendees': valid_attendees
}
try:
# Crea el evento en el calendario
event = service.events().insert(
calendarId=calendar_id, body=event).execute()
print('Event created: %s' % (event.get('htmlLink')))
except Exception as e:
print(f"Error al crear el evento: {e}")
return {}
return event
@tool
def create_quick_add_event(quick_add_text: str):
"""
Use this tool to create events in the calendar from natural language,
using the Quick Add feature of Google Calendar.
"""
quick_add_text: str = input(
"- Escribe la descripcion del evento que quieres crear: ")
SCOPES = ['https://www.googleapis.com/auth/calendar']
creds = None
# La ruta al archivo token.json,
# que contiene los tokens de acceso y actualización
token_path = 'token_2.json'
# La ruta al archivo de credenciales de OAuth 2.0
creds_path = 'credentials_2.json'
# Cargar las credenciales desde el archivo token.json, si existe
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# Si no hay credenciales válidas disponibles,
# inicia el flujo de OAuth 2.0 para obtener nuevas credenciales
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
creds_path, SCOPES)
creds = flow.run_local_server(port=0)
# Guarda las credenciales para la próxima ejecución
with open(token_path, 'w') as token_file:
token_file.write(creds.to_json())
# Construye el objeto de servicio para interactuar
# con la API de Google Calendar
service = build('calendar', 'v3', credentials=creds)
# Identificador del calendario que deseas modificar.
# 'primary' se refiere al calendario principal del usuario.
calendar_id = 'primary'
# Crea el evento utilizando la funcionalidad Quick Add
event = service.events().quickAdd(
calendarId=calendar_id, text=quick_add_text).execute()
print('Event created: %s' % (event.get('htmlLink')))
return event
# @tool
# def send_message(message: str):
# """Use this function when you need to communicate with the user."""
# # Configuración del bot
# load_dotenv()
# API_TOKEN_BOT = os.getenv("API_TOKEN_BOT")
# bot = telebot.TeleBot(API_TOKEN_BOT)
#
# bot.send_message(chat_id="5076346205", text=message)
#
@tool
def send_message(message: str):
"""Use this function when you need to communicate with Cristian."""
@@ -308,29 +199,32 @@ def get_company_info(prompt: str) -> str:
"""
file_path: str = 'onecluster_info.pdf'
docs_split: list = load_split_docs(file_path)
embeddings_model = load_embeddins()
llm = load_llm_openai()
create_verctorstore(
docs_split,
embeddings_model,
file_path
)
retriever = create_retriever(
embeddings_model,
persist_directory="embeddings/onecluster_info"
)
qa = create_rag_chain(
llm, retriever)
try:
docs_split: list = load_split_docs(file_path)
embeddings_model = load_embeddins()
llm = load_llm_openai()
# Usar el nombre corregido de la función
create_vectorstore(
docs_split,
embeddings_model,
file_path
)
retriever = create_retriever(
embeddings_model,
persist_directory="embeddings/onecluster_info"
)
qa = create_rag_chain(llm, retriever)
# prompt: str = "Escribe un parrarfo describiendo cuantos son y
# cuales son los servicios que ofrece OneCluster
# y brinda detalles sobre cada uno."
response = qa.invoke(
{"input": prompt, "chat_history": []}
)
response = qa.invoke(
{"input": prompt, "chat_history": []}
)
return response["answer"]
return response["answer"]
except Exception as e:
print(f"Error en get_company_info: {e}")
return f"Lo siento, hubo un error al procesar la información: {str(e)}"
@tool
@@ -344,3 +238,123 @@ def get_current_date_and_time():
bogota_tz = pytz.timezone('America/Bogota')
current_date_and_time = datetime.now(bogota_tz)
return current_date_and_time.strftime('%Y-%m-%d %H:%M:%S')
@tool
def list_calendar_events(max_results: int = 50) -> list:
"""Use this tool to list upcoming calendar events."""
try:
# Usar el sistema centralizado de autenticación
creds = get_google_credentials()
# Construir el servicio de Calendar
service = build('calendar', 'v3', credentials=creds)
# Obtener eventos próximos
now = datetime.now(timezone.utc).isoformat()
events_result = service.events().list(
calendarId='primary',
timeMin=now,
maxResults=max_results,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
if not events:
print('No se encontraron eventos próximos.')
return []
# Formatear la salida de eventos
formatted_events = []
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
formatted_events.append({
'start': start,
'summary': event['summary'],
'id': event['id'],
'link': event.get('htmlLink')
})
print(f"{start} - {event['summary']}")
return formatted_events
except Exception as e:
print(f"Error al listar eventos: {e}")
return {"error": str(e)}
@tool
def create_calendar_event(
title: str, start_time: datetime,
end_time: datetime, attendees: list = None) -> dict:
"""Use this tool to create an event in the calendar."""
try:
# Usar el sistema centralizado de autenticación
creds = get_google_credentials()
# Construir el servicio de Calendar
service = build('calendar', 'v3', credentials=creds)
# Identificador del calendario principal
calendar_id = 'primary'
# Definir el evento
event = {
'summary': title,
'start': {
'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': 'America/Bogota',
},
'end': {
'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S'),
'timeZone': 'America/Bogota',
},
}
# Agregar asistentes si se proporcionan
if attendees and isinstance(attendees, list) and len(attendees) > 0:
# Asegurarse de que cada asistente tenga un correo válido
attendees_list = []
for attendee in attendees:
if isinstance(attendee, str) and '@' in attendee:
attendees_list.append({'email': attendee.strip()})
if attendees_list:
event['attendees'] = attendees_list
# Enviar invitaciones por correo
event['sendUpdates'] = 'all'
# Crear el evento
event = service.events().insert(calendarId=calendar_id, body=event).execute()
print(f'Evento creado: {event.get("htmlLink")}')
return event
except Exception as e:
print(f"Error al crear el evento: {e}")
return {"error": str(e)}
@tool
def create_quick_add_event(quick_add_text: str) -> dict:
"""Use this tool to create events in the calendar from natural language."""
try:
# Usar el sistema centralizado de autenticación
creds = get_google_credentials()
# Construir el servicio de Calendar
service = build('calendar', 'v3', credentials=creds)
# Identificador del calendario principal
calendar_id = 'primary'
# Crear el evento usando quick add
event = service.events().quickAdd(
calendarId=calendar_id,
text=quick_add_text
).execute()
print(f'Evento creado: {event.get("htmlLink")}')
return event
except Exception as e:
print(f"Error al crear el evento rápido: {e}")
return {"error": str(e)}

View File

@@ -2,16 +2,18 @@ from langchain_chroma import Chroma
import os
def create_verctorstore(docs_split: list, embeddings, file_name: str):
def create_vectorstore(docs_split: list, embeddings, file_name: str):
db_name: str = file_name.replace(".pdf", "").replace(" ", "_").lower()
persist_directory: str = f"embeddings/{db_name}"
if not os.path.exists(persist_directory):
vectordb = Chroma.from_documents(
persist_directory=persist_directory,
documents=docs_split,
embedding=embeddings,
)
# Crear el directorio si no existe
os.makedirs(persist_directory, exist_ok=True)
return vectordb
# Siempre crear/actualizar el vectorstore
vectordb = Chroma.from_documents(
persist_directory=persist_directory,
documents=docs_split,
embedding=embeddings,
)
return vectordb

View File

@@ -7,8 +7,7 @@ from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.tools.gmail.utils import (
build_resource_service,
get_gmail_credentials)
build_resource_service)
from langchain_community.agent_toolkits import GmailToolkit
from app.langchain_tools.agent_tools import (
@@ -26,6 +25,8 @@ from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from dotenv import load_dotenv
from app.google_auth import get_google_credentials
import os
load_dotenv()
@@ -37,17 +38,27 @@ llm = ChatOpenAI(
# # Configuración de Gmail
toolkit = GmailToolkit()
credentials = get_gmail_credentials(
token_file="token.json",
scopes=["https://mail.google.com/"],
client_secrets_file="credentials.json",
)
api_resource = build_resource_service(credentials=credentials)
toolkit = GmailToolkit(api_resource=api_resource)
toolkit = None
try:
token_path = os.path.join('tokens', 'gmail_token.json')
credentials = get_google_credentials(
scopes=[
"https://mail.google.com/",
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/calendar.events"
],
credentials_file="google_credentials.json"
)
api_resource = build_resource_service(credentials=credentials)
toolkit = GmailToolkit(api_resource=api_resource)
except Exception as e:
print(f"Warning: Could not initialize Gmail toolkit: {e}")
# # Crear herramientas
tools = toolkit.get_tools()
tools = []
if toolkit:
tools.extend(toolkit.get_tools())
search = TavilySearchResults(max_results=2)
tools.extend([
search, redact_email, list_calendar_events,
@@ -156,19 +167,26 @@ async def process_text(request: Request):
if __name__ == "__main__":
config = {"configurable": {"thread_id": "thread-1", "recursion_limit": 50}}
# Modo interactivo por defecto
import sys
if "--server" not in sys.argv:
while True:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
while True:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
events = graph.stream({
"messages": [("user", user_input)],
"is_last_step": False},
config, stream_mode="updates")
events = graph.stream({
"messages": [("user", user_input)],
"is_last_step": False},
config, stream_mode="updates")
for event in events:
if "agent" in event:
print(
f"\nAsistente: {event['agent']['messages'][-1].content}\n")
for event in events:
if "agent" in event:
print(
f"\nAsistente: {event['agent']['messages'][-1].content}\n")
else:
# Modo servidor con uvicorn
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)

54
app/test_google_auth.py Normal file
View File

@@ -0,0 +1,54 @@
import unittest
from unittest.mock import patch, MagicMock
from google.oauth2.credentials import Credentials
from app.google_auth import get_google_credentials
import tempfile
import shutil
import os
class TestGoogleAuth(unittest.TestCase):
"""Test cases for Google authentication module."""
def setUp(self):
"""Set up test fixtures."""
self.mock_creds = MagicMock(spec=Credentials)
self.test_scopes = ['https://www.googleapis.com/auth/gmail.readonly']
self.test_dir = tempfile.mkdtemp()
self.token_path = os.path.join(
self.test_dir, 'tokens', 'gmail_token.json'
)
def tearDown(self):
"""Clean up after tests."""
shutil.rmtree(self.test_dir)
@patch('os.path.exists')
@patch('google.oauth2.credentials.Credentials.from_authorized_user_file')
def test_get_credentials_from_token(self, mock_creds, mock_exists):
"""Test getting credentials from existing token file."""
mock_exists.return_value = True
mock_creds.return_value = self.mock_creds
self.mock_creds.valid = True
creds = get_google_credentials(self.test_scopes)
self.assertEqual(creds, self.mock_creds)
@patch('os.path.exists')
@patch('google.oauth2.credentials.Credentials.from_authorized_user_file')
def test_refresh_expired_credentials(self, mock_creds, mock_exists):
"""Test refreshing expired credentials."""
mock_exists.return_value = True
mock_creds.return_value = self.mock_creds
self.mock_creds.valid = False
self.mock_creds.expired = True
self.mock_creds.refresh_token = True
self.mock_creds.to_json.return_value = '{"mock": "credentials"}'
creds = get_google_credentials(self.test_scopes)
self.assertTrue(self.mock_creds.refresh.called)
self.assertEqual(creds, self.mock_creds)
if __name__ == '__main__':
unittest.main()