updated README.md

pull/45/head
jacobkrakai 1 year ago
parent 2eda997810
commit 1196719bbc

@ -1,10 +1,10 @@
# import libraries
import os
import re
import shutil
import urllib.request
from pathlib import Path
from tempfile import NamedTemporaryFile
import fitz
import numpy as np
import openai
@ -13,26 +13,36 @@ from fastapi import UploadFile
from lcserve import serving
from sklearn.neighbors import NearestNeighbors
# download pdf from given url
def download_pdf(url, output_path):
urllib.request.urlretrieve(url, output_path)
# preprocess text
def preprocess(text):
text = text.replace('\n', ' ')
text = re.sub('\s+', ' ', text)
text = text.replace("\n", " ")
text = re.sub("\s+", " ", text)
return text
# convert pdf to text list
def pdf_to_text(path, start_page=1, end_page=None):
doc = fitz.open(path)
total_pages = doc.page_count
# if end page is not specified set it to total pages
if end_page is None:
end_page = total_pages
text_list = []
# loop through all the pages and get the text
for i in range(start_page - 1, end_page):
text = doc.load_page(i).get_text("text")
text = preprocess(text)
@ -42,14 +52,19 @@ def pdf_to_text(path, start_page=1, end_page=None):
return text_list
# convert text list to chunks of words with page numbers
def text_to_chunks(texts, word_length=150, start_page=1):
text_toks = [t.split(' ') for t in texts]
text_toks = [t.split(" ") for t in texts]
page_nums = []
chunks = []
# loop through each word and create chunks
for idx, words in enumerate(text_toks):
for i in range(0, len(words), word_length):
chunk = words[i: i + word_length]
chunk = words[i : i + word_length]
# if last chunk is smaller than word length and not last page then add it to next page
if (
(i + word_length) > len(words)
and (len(chunk) < word_length)
@ -57,18 +72,21 @@ def text_to_chunks(texts, word_length=150, start_page=1):
):
text_toks[idx + 1] = chunk + text_toks[idx + 1]
continue
chunk = ' '.join(chunk).strip()
chunk = " ".join(chunk).strip()
chunk = f'[Page no. {idx + start_page}] "{chunk}"'
chunks.append(chunk)
return chunks
# semantic search class
class SemanticSearch:
def __init__(self):
self.use = hub.load(
'https://tfhub.dev/google/universal-sentence-encoder/4')
self.use = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
self.fitted = False
# fit the data
def fit(self, data, batch=1000, n_neighbors=5):
self.data = data
self.embeddings = self.get_text_embedding(data, batch=batch)
@ -77,27 +95,35 @@ class SemanticSearch:
self.nn.fit(self.embeddings)
self.fitted = True
# call the model
def __call__(self, text, return_data=True):
inp_emb = self.use([text])
neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0]
return [self.data[i] for i in neighbors] if return_data else neighbors
# get text embedding
def get_text_embedding(self, texts, batch=1000):
embeddings = []
for i in range(0, len(texts), batch):
text_batch = texts[i: (i + batch)]
text_batch = texts[i : (i + batch)]
emb_batch = self.use(text_batch)
embeddings.append(emb_batch)
return np.vstack(embeddings)
# load recommender
def load_recommender(path, start_page=1):
global recommender
texts = pdf_to_text(path, start_page=start_page)
chunks = text_to_chunks(texts, start_page=start_page)
recommender.fit(chunks)
return 'Corpus Loaded.'
return "Corpus Loaded."
# generate text using openAI
def generate_text(openAI_key, prompt, engine="text-davinci-003"):
@ -113,11 +139,14 @@ def generate_text(openAI_key, prompt, engine="text-davinci-003"):
return completions.choices[0].text
# generate answer for a given question
def generate_answer(question, openAI_key):
topn_chunks = recommender(question)
prompt = "" + 'search results:\n\n'
prompt = "" + "search results:\n\n"
for c in topn_chunks:
prompt += c + '\n\n'
prompt += c + "\n\n"
prompt += (
"Instructions: Compose a comprehensive reply to the query using the search results given. "
@ -134,8 +163,11 @@ def generate_answer(question, openAI_key):
return generate_text(openAI_key, prompt, "text-davinci-003")
# global instance of semantic search
recommender = SemanticSearch()
# load openAI key
def load_openai_key() -> str:
key = os.environ.get("OPENAI_API_KEY")
@ -146,14 +178,20 @@ def load_openai_key() -> str:
return key
# ask url
@serving
def ask_url(url: str, question: str):
download_pdf(url, 'corpus.pdf')
load_recommender('corpus.pdf')
download_pdf(url, "corpus.pdf")
load_recommender("corpus.pdf")
openAI_key = load_openai_key()
return generate_answer(question, openAI_key)
# ask file
@serving
async def ask_file(file: UploadFile, question: str) -> str:
suffix = Path(file.filename).suffix
@ -163,4 +201,4 @@ async def ask_file(file: UploadFile, question: str) -> str:
load_recommender(str(tmp_path))
openAI_key = load_openai_key()
return generate_answer(question, openAI_key)
return generate_answer(question, openAI_key)

@ -1,7 +1,9 @@
import json
import requests
import json # importing the JSON module for encoding and decoding data
import requests # importing the Requests library for making HTTP requests
import gradio as gr # importing the Gradio library for building web interfaces
import gradio as gr
# Define a function named ask_api that accepts 5 parameters -
# lcserve_host, url, file, question, openAI_key - and returns a string
def ask_api(
@ -11,26 +13,34 @@ def ask_api(
question: str,
openAI_key: str,
) -> str:
# Check if lcserve_host starts with "http"
if not lcserve_host.startswith("http"):
# Throw an exception if lcserve_host is invalid
raise ValueError("Invalid API Host")
# If neither url nor file is provided, throw an exception
if not any([url.strip(), file]):
raise ValueError("Either URL or PDF should be provided.")
# If both url and file are provided, throw an exception
if all([url.strip(), file]):
raise ValueError("Both URL and PDF are provided. Please provide only one.")
# If question field is empty, throw an exception
if not question.strip():
raise ValueError("Question field is empty.")
# Create a dictionary _data with two keys "question" and "envs"
_data = {
"question": question,
"envs": {"OPENAI_API_KEY": openAI_key},
}
# If url is provided, make a POST request to "lcserve_host"/ask_url route with data _data
if url.strip():
r = requests.post(f"{lcserve_host}/ask_url", json={"url": url, **_data})
# Otherwise open the file in binary mode and make a POST request to "lcserve_host"/ask_file route with data _data and the file
else:
with open(file.name, "rb") as f:
r = requests.post(
@ -40,50 +50,82 @@ def ask_api(
)
try:
# Raise an HTTPError if one occurs while making a request to the server
r.raise_for_status()
except requests.exceptions.HTTPError as e:
raise ValueError(
raise ValueError( # Throw a ValueError if the request fails
f"Request failed with status code {r.status_code}: {e}"
) from e
# Return the value of the "result" key in the JSON response
return r.json()["result"]
# Define variables title and description which describe our Gradio interface
title = "PDF GPT"
description = """ PDF GPT allows you to chat with your PDF file using Universal Sentence Encoder and Open AI. It gives hallucination free response than other tools as the embeddings are better than OpenAI. The returned response can even cite the page number in square brackets([]) where the information is located, adding credibility to the responses and helping to locate pertinent information quickly."""
# Define a Gradio Blocks object named demo
with gr.Blocks() as demo:
# Add a Markdown heading and description to the Gradio interface
gr.Markdown(f"<center><h1>{title}</h1></center>")
gr.Markdown(description)
# Create two side-by-side Groups for input fields and outputs
with gr.Row():
with gr.Group():
# Add a Textbox widget to accept the API host URL from the user
lcserve_host = gr.Textbox(
label="Enter your API Host here",
value="http://localhost:8080",
placeholder="http://localhost:8080",
)
# Add a link to the OpenAI API key webpage and a Password textbox to get the user's API Key
gr.Markdown(
'<p style="text-align:center">Get your Open AI API key <a href="https://platform.openai.com/account/api-keys">here</a></p>'
)
openAI_key = gr.Textbox(label="Enter your OpenAI API key here", type="password")
openAI_key = gr.Textbox(
label="Enter your OpenAI API key here", type="password"
)
# Add a Text box that allows users to enter URL of the PDF file they want to chat with
pdf_url = gr.Textbox(label="Enter PDF URL here")
# Add a File Upload widget so that users can upload their PDF/Research Paper/Book
gr.Markdown("<center><h4>OR<h4></center>")
file = gr.File(label="Upload your PDF/ Research Paper / Book here", file_types=[".pdf"])
file = gr.File(
label="Upload your PDF/ Research Paper / Book here", file_types=[".pdf"]
)
# Add a field for the user to enter their question
question = gr.Textbox(label="Enter your question here")
# Add a submit button for user to trigger their API request
btn = gr.Button(value="Submit")
btn.style(full_width=True)
# Add another group for the output area where the answer will be shown
with gr.Group():
answer = gr.Textbox(label="The answer to your question is :")
# Define function onclick() which will be called when the user clicks the "submit" button
def on_click():
try:
ans = ask_api(lcserve_host.value, pdf_url.value, file, question.value, openAI_key.value)
# Call the ask_api function and update the answer in the Gradio UI
ans = ask_api(
lcserve_host.value,
pdf_url.value,
file,
question.value,
openAI_key.value,
)
answer.update(str(ans))
except ValueError as e:
# Update the response with an error message if an error occurs during the API call
answer.update(f"[ERROR]: {str(e)}")
btn.click(on_click)
# Launch the Gradio interface on port number 7860
demo.launch(server_port=7860)

Loading…
Cancel
Save