Merge pull request #7 from sean1832/Code_Style

Refactor and Build Updates
multilingual_support
Zeke Zhang 1 year ago committed by GitHub
commit b54071fe75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
from GPT import query
from GPT import toolkit
from GPT import model

@ -7,3 +7,15 @@ class param:
self.present_penalty = present_penalty
self.chunk_count = chunk_count
self.chunk_size = chunk_size
class Model:
def __init__(self, question_model, other_models):
self.question_model = question_model
self.other_models = other_models
class Operation:
def __init__(self, operations, operations_no_question):
self.operations = operations
self.operations_no_question = operations_no_question

@ -1,15 +1,15 @@
import openai
import textwrap
from modules import utilities as util
from modules import language
import streamlit as st
from modules import gpt_util as gpt
import modules.utilities as util
import modules.language as language
import GPT
openai.api_key = util.read_file(r'.user\API-KEYS.txt').strip()
if 'SESSION_LANGUAGE' not in st.session_state:
st.session_state['SESSION_LANGUAGE'] = util.read_json_at('.user/language.json', 'SESSION_LANGUAGE', 'en_US')
# if 'SESSION_LANGUAGE' not in st.session_state:
# st.session_state['SESSION_LANGUAGE'] = util.read_json_at('.user/language.json', 'SESSION_LANGUAGE', 'en_US')
SESSION_LANG = st.session_state['SESSION_LANGUAGE']
prompt_dir = f'.user/prompt/{SESSION_LANG}'
@ -23,10 +23,8 @@ def build(chunk_size=4000):
chunks = textwrap.wrap(all_text, chunk_size)
result = []
print('Building brain data...')
for chunk in chunks:
embedding = gpt.embedding(chunk.encode(encoding='ASCII', errors='ignore').decode())
embedding = GPT.toolkit.embedding(chunk.encode(encoding='ASCII', errors='ignore').decode())
info = {'content': chunk, 'vector': embedding}
print(info, '\n\n\n')
result.append(info)
@ -36,7 +34,7 @@ def build(chunk_size=4000):
def run_answer(query, model, temp, max_tokens, top_p, freq_penl, pres_penl, chunk_count):
brain_data = util.read_json(r'.user\brain-data.json')
results = gpt.search_chunks(query, brain_data, chunk_count)
results = GPT.toolkit.search_chunks(query, brain_data, chunk_count)
answers = []
for result in results:
my_info = util.read_file(f'{prompt_dir}/' + _('my-info') + '.txt')
@ -46,7 +44,7 @@ def run_answer(query, model, temp, max_tokens, top_p, freq_penl, pres_penl, chun
prompt = prompt.replace('<<QS>>', query)
prompt = prompt.replace('<<MY-INFO>>', my_info)
answer = gpt.gpt3(prompt, model, temp, max_tokens, top_p, freq_penl, pres_penl)
answer = GPT.toolkit.gpt3(prompt, model, temp, max_tokens, top_p, freq_penl, pres_penl)
answers.append(answer)
all_answers = '\n\n'.join(answers)
@ -58,7 +56,7 @@ def run(query, model, prompt_file, temp, max_tokens, top_p, freq_penl, pres_penl
responses = []
for chunk in chunks:
prompt = util.read_file(prompt_file).replace('<<DATA>>', chunk)
response = gpt.gpt3(prompt, model, temp, max_tokens, top_p, freq_penl, pres_penl)
response = GPT.toolkit.gpt3(prompt, model, temp, max_tokens, top_p, freq_penl, pres_penl)
responses.append(response)
all_response = '\n\n'.join(responses)
return all_response

@ -26,7 +26,7 @@ def search_chunks(text, data, count=1):
'content': item['content'],
'point': point
})
# sort points base on decendent order
# sort points base on descendant order
ordered = sorted(points, key=lambda d: d['point'], reverse=True)
return ordered[0:count]

@ -1,102 +1,26 @@
import streamlit as st
from modules import utilities as util
from modules import model_data
from modules import language
import brain
import check_update
import time
import os
# activate session
if 'SESSION_TIME' not in st.session_state:
st.session_state['SESSION_TIME'] = time.strftime("%Y%m%d-%H%H%S")
st.set_page_config(
page_title='GPT Brain'
)
import streamlit as st
util.remove_oldest_file('.user/log', 10)
import modules.INFO as INFO
import modules as mod
import GPT
import modules.utilities as util
import streamlit_toolkit.tools as st_tool
model_options = ['text-davinci-003', 'text-curie-001', 'text-babbage-001', 'text-ada-001']
header = st.container()
body = st.container()
LOG_PATH = '.user/log'
SESSION_TIME = st.session_state['SESSION_TIME']
SESSION_LANG = st.session_state['SESSION_LANGUAGE']
PROMPT_PATH = f'.user/prompt/{SESSION_LANG}'
CURRENT_LOG_FILE = f'{LOG_PATH}/log_{SESSION_TIME}.log'
BRAIN_MEMO = '.user/brain-memo.json'
MANIFEST = '.core/manifest.json'
def create_log():
if not os.path.exists(CURRENT_LOG_FILE):
util.write_file(f'Session {SESSION_TIME}\n\n', CURRENT_LOG_FILE)
return CURRENT_LOG_FILE
def log(content, delimiter=''):
log_file = create_log()
if delimiter != '':
delimiter = f'\n\n=============={delimiter}==============\n'
util.write_file(f'\n{delimiter + content}', log_file, 'a')
util.remove_oldest_file(INFO.LOG_PATH, 10)
def clear_log():
log_file_name = f'log_{SESSION_TIME}.log'
for root, dirs, files in os.walk(LOG_PATH):
for file in files:
if not file == log_file_name:
os.remove(os.path.join(root, file))
def save_as():
# download log file
with open(CURRENT_LOG_FILE, 'rb') as f:
content = f.read()
st.download_button(
label=_("📥download log"),
data=content,
file_name=f'log_{SESSION_TIME}.txt',
mime='text/plain'
)
def process_response(query, target_model, prompt_file: str, data: model_data.param):
# check if exclude model is not target model
file_name = util.get_file_name(prompt_file)
print(_('Processing') + f" {file_name}...")
with st.spinner(_('Thinking on') + f" {file_name}..."):
results = brain.run(query, target_model, prompt_file,
data.temp,
data.max_tokens,
data.top_p,
data.frequency_penalty,
data.present_penalty)
# displaying results
st.header(f'📃{file_name}')
st.info(f'{results}')
time.sleep(1)
log(results, delimiter=f'{file_name.upper()}')
def message(msg, condition=None):
if condition is not None:
if condition:
st.warning("⚠️" + msg)
else:
st.warning("⚠️" + msg)
header = st.container()
body = st.container()
# sidebar
with st.sidebar:
_ = language.set_language()
_ = mod.language.set_language()
st.title(_('Settings'))
language.select_language()
mod.language.select_language()
prompt_files = util.scan_directory(PROMPT_PATH)
prompt_file_names = [util.get_file_name(file) for file in prompt_files]
@ -106,109 +30,69 @@ with st.sidebar:
operation_options = list(prompt_dictionary.keys())
operations = st.multiselect(_('Operations'), operation_options,
default=util.read_json_at(BRAIN_MEMO, f'operations_{SESSION_LANG}', operation_options[0]))
default=util.read_json_at(INFO.BRAIN_MEMO, f'operations_{SESSION_LANG}',
operation_options[0]))
last_question_model = util.read_json_at(BRAIN_MEMO, 'question_model', model_options[0])
last_question_model = util.read_json_at(INFO.BRAIN_MEMO, 'question_model', INFO.MODELS_OPTIONS[0])
# get index of last question model
question_model_index = util.get_index(model_options, last_question_model)
question_model = st.selectbox(_('Question Model'), model_options, index=question_model_index)
question_model_index = util.get_index(INFO.MODELS_OPTIONS, last_question_model)
question_model = st.selectbox(_('Question Model'), INFO.MODELS_OPTIONS, index=question_model_index)
operations_no_question = [op for op in operations if op != _('question')]
other_models = []
replace_tokens = []
for operation in operations_no_question:
last_model = util.read_json_at(BRAIN_MEMO, f'{operation}_model', model_options[0])
last_model = util.read_json_at(INFO.BRAIN_MEMO, f'{operation}_model', INFO.MODELS_OPTIONS[0])
# get index of last model
model_index = util.get_index(model_options, last_model)
model = st.selectbox(f"{operation} " + _('Model'), model_options, index=model_index)
model_index = util.get_index(INFO.MODELS_OPTIONS, last_model)
model = st.selectbox(f"{operation} " + _('Model'), INFO.MODELS_OPTIONS, index=model_index)
other_models.append(model)
temp = st.slider(_('Temperature'), 0.0, 1.0, value=util.read_json_at(BRAIN_MEMO, 'temp', 0.1))
max_tokens = st.slider(_('Max Tokens'), 850, 4500, value=util.read_json_at(BRAIN_MEMO, 'max_tokens', 1000))
temp = st.slider(_('Temperature'), 0.0, 1.0, value=util.read_json_at(INFO.BRAIN_MEMO, 'temp', 0.1))
max_tokens = st.slider(_('Max Tokens'), 850, 4500, value=util.read_json_at(INFO.BRAIN_MEMO, 'max_tokens', 1000))
with st.expander(label=_('Advanced Options')):
top_p = st.slider(_('Top_P'), 0.0, 1.0, value=util.read_json_at(BRAIN_MEMO, 'top_p', 1.0))
top_p = st.slider(_('Top_P'), 0.0, 1.0, value=util.read_json_at(INFO.BRAIN_MEMO, 'top_p', 1.0))
freq_panl = st.slider(_('Frequency penalty'), 0.0, 1.0,
value=util.read_json_at(BRAIN_MEMO, 'frequency_penalty', 0.0))
value=util.read_json_at(INFO.BRAIN_MEMO, 'frequency_penalty', 0.0))
pres_panl = st.slider(_('Presence penalty'), 0.0, 1.0,
value=util.read_json_at(BRAIN_MEMO, 'present_penalty', 0.0))
value=util.read_json_at(INFO.BRAIN_MEMO, 'present_penalty', 0.0))
chunk_size = st.slider(_('Chunk size'), 1500, 4500, value=util.read_json_at(BRAIN_MEMO, 'chunk_size', 4000))
chunk_count = st.slider(_('Answer count'), 1, 5, value=util.read_json_at(BRAIN_MEMO, 'chunk_count', 1))
chunk_size = st.slider(_('Chunk size'), 1500, 4500,
value=util.read_json_at(INFO.BRAIN_MEMO, 'chunk_size', 4000))
chunk_count = st.slider(_('Answer count'), 1, 5, value=util.read_json_at(INFO.BRAIN_MEMO, 'chunk_count', 1))
param = model_data.param(temp=temp,
max_tokens=max_tokens,
top_p=top_p,
frequency_penalty=freq_panl,
present_penalty=pres_panl,
chunk_size=chunk_size,
chunk_count=chunk_count)
param = GPT.model.param(temp=temp,
max_tokens=max_tokens,
top_p=top_p,
frequency_penalty=freq_panl,
present_penalty=pres_panl,
chunk_size=chunk_size,
chunk_count=chunk_count)
if st.button(_('Clear Log'), on_click=clear_log):
op = GPT.model.Operation(operations=operations,
operations_no_question=operations_no_question)
models = GPT.model.Model(question_model=question_model,
other_models=other_models)
if st.button(_('Clear Log'), on_click=st_tool.clear_log):
st.success(_('Log Cleared'))
# info
st.markdown('---')
st.markdown(f"# {util.read_json_at(MANIFEST, 'name')}")
st.markdown(_('Version') + f": {util.read_json_at(MANIFEST, 'version')}")
st.markdown(_('Author') + f": {util.read_json_at(MANIFEST, 'author')}")
st.markdown("[" + _('Report bugs') + "]" + f"({util.read_json_at(MANIFEST, 'bugs')})")
st.markdown("[" + _('Github Repo') + "]" + f"({util.read_json_at(MANIFEST, 'homepage')})")
st.markdown(f"# {util.read_json_at(INFO.MANIFEST, 'name')}")
st.markdown(_('Version') + f": {util.read_json_at(INFO.MANIFEST, 'version')}")
st.markdown(_('Author') + f": {util.read_json_at(INFO.MANIFEST, 'author')}")
st.markdown("[" + _('Report bugs') + "]" + f"({util.read_json_at(INFO.MANIFEST, 'bugs')})")
st.markdown("[" + _('Github Repo') + "]" + f"({util.read_json_at(INFO.MANIFEST, 'homepage')})")
with header:
st.title(_('🧠GPT-Brain'))
st.text(_('This is my personal AI powered brain feeding my own Obsidian notes. Ask anything.'))
message(_("This is a beta version. Please [🪲report bugs](") + util.read_json_at(MANIFEST, 'bugs') + _(
") if you find any."))
def execute_brain(q):
# log question
log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}')
if check_update.isUpdated():
st.success(_('Building Brain...'))
# if brain-info is updated
brain.build(chunk_size)
st.success(_('Brain rebuild!'))
time.sleep(2)
# thinking on answer
with st.spinner(_('Thinking on Answer')):
answer = brain.run_answer(q, question_model, temp, max_tokens, top_p, freq_panl, pres_panl,
chunk_count=chunk_count)
if util.contains(operations, _('question')):
# displaying results
st.header(_('💬Answer'))
st.info(f'{answer}')
time.sleep(1)
log(answer, delimiter='ANSWER')
# thinking on other outputs
if len(operations_no_question) > 0:
for i in range(len(operations_no_question)):
prompt_path = prompt_dictionary[operations_no_question[i]]
other_model = other_models[i]
process_response(answer, other_model, prompt_path, param)
# convert param to dictionary
param_dict = vars(param)
# write param to json
for key in param_dict:
value = param_dict[key]
util.update_json(BRAIN_MEMO, key, value)
# write operation to json
util.update_json(BRAIN_MEMO, f'operations_{SESSION_LANG}', operations)
# write question model to json
util.update_json(BRAIN_MEMO, 'question_model', question_model)
# write other models to json
for i in range(len(operations_no_question)):
util.update_json(BRAIN_MEMO, f'{operations_no_question[i]}_model', other_models[i])
st_tool.message(_("This is a beta version. Please [🪲report bugs](") +
util.read_json_at(INFO.MANIFEST, 'bugs') + _(") if you find any."))
# main
with body:
@ -217,8 +101,8 @@ with body:
with col1:
send = st.button(_('📩Send'))
with col2:
if os.path.exists(CURRENT_LOG_FILE):
save_as()
if os.path.exists(INFO.CURRENT_LOG_FILE):
st_tool.download_as()
# execute brain calculation
if not question == '' and send:
execute_brain(question)
st_tool.execute_brain(question, param, op, models, prompt_dictionary, SESSION_LANG)

@ -1,73 +0,0 @@
import openai
import numpy as np
import textwrap
import utilities
openai.api_key = utilities.open_file(r'.user\API-KEYS.txt').strip()
BRAIN_DATA = utilities.read_json_file(r'.user\brain-data.json')
# this function compare similarity between two vectors.
# The higher value the dot product have, the more alike between these vectors
def similarity(v1, v2):
return np.dot(v1, v2)
def search_chunks(text, data, count=1):
vector = utilities.embedding(text)
points = []
for item in data:
# compare search terms with brain-data
point = similarity(vector, item['vector'])
points.append({
'content': item['content'],
'point': point
})
# sort points base on decendent order
ordered = sorted(points, key=lambda d: d['point'], reverse=True)
return ordered[0:count]
def gpt3(prompt, model='text-davinci-003'):
response = openai.Completion.create(
model= model,
prompt=prompt,
temperature=0.1,
max_tokens=1000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
text = response['choices'][0]['text'].strip()
return text
def main():
while True:
query = input('\n\nAsk brain: ')
results = search_chunks(query, BRAIN_DATA)
answers = []
answers_count = 0
for result in results:
my_info = utilities.open_file(r'prompt\my-info.txt')
prompt = utilities.open_file(r'prompt\question.txt')
prompt = prompt.replace('<<INFO>>', result['content'])
prompt = prompt.replace('<<QS>>', query)
prompt = prompt.replace('<<MY-INFO>>', my_info)
answer = gpt3(prompt, model='text-davinci-003')
answers.append(answer)
answers_count += 1
all_answers = '\n\n'.join(answers)
print('\n\n============ANSWER============\n\n', all_answers)
chunks = textwrap.wrap(all_answers, 10000)
end = []
for chunk in chunks:
prompt = utilities.open_file(r'prompt\summarize.txt').replace('<<SUM>>', chunk)
summary = gpt3(prompt, model='text-curie-001')
end.append(summary)
print('\n\n============SUMMRY============\n\n', '\n\n'.join(end))
if __name__ == '__main__':
main()

@ -1,25 +0,0 @@
import openai
import textwrap
import utilities
openai.api_key = utilities.open_file(r'.user\API-KEYS.txt').strip()
def main():
all_text = utilities.open_file(r'.user\input.txt')
# split text into smaller chunk of 4000 char each
chunks = textwrap.wrap(all_text, 4000)
result = []
for chunk in chunks:
embedding = utilities.embedding(chunk.encode(encoding='ASCII', errors='ignore').decode())
info = {'content':chunk, 'vector':embedding}
print(info, '\n\n\n')
result.append(info)
utilities.write_json_file(result, r'.user\brain-data.json')
if __name__ == '__main__':
main()

@ -1,44 +0,0 @@
import os
import time
import utilities
file_path = r'.user\input.txt'
temp_file = r'.user\input_last-run.temp'
sig_file = r'.user\input_sig.temp'
def compare_time(t1, t2):
return t1 == t2
def write_sig(bool):
utilities.write_file(bool, sig_file)
def check():
if os.path.exists(file_path):
# get modification time of the file
mod_time = os.path.getmtime(file_path)
# convert the modification time to readable format
read_mod_time = time.ctime(mod_time)
if os.path.exists(temp_file):
temp_info = utilities.open_file(temp_file)
if compare_time(read_mod_time, temp_info):
write_sig('not updated')
print('File has not been updated.')
else:
print('File has been updated.')
utilities.write_file(read_mod_time, temp_file)
write_sig('updated')
else:
print('Temp file not exist, writing temp file...')
# write to temp file
utilities.write_file(read_mod_time, temp_file)
write_sig('not updated')
else:
raise FileNotFoundError(f'File: {file_path} does not exist.')
def main():
check()
if __name__ == '__main__':
main()

@ -1,27 +0,0 @@
@echo off
cd..
echo Activating Virtural environment...
call .\venv\Scripts\activate
rem checking if input.txt is updated
python console_app\check_update.py
setlocal enabledelayedexpansion
set "tempFile=.user\input_sig.temp"
for /f "usebackq delims=" %%a in ("%tempFile%") do (
set "tempValue=%%a"
)
if "%tempValue%" == "not updated" (
goto end
) else (
call batch-programs\run-build-brain.bat
cls
echo Brain updated!
)
:end
echo running brain...
python console_app\brain.py

@ -1,24 +0,0 @@
import json
import openai
def open_file(filepath):
with open(filepath, 'r', encoding='utf-8') as file:
return file.read()
def write_file(content, filepath):
with open(filepath, 'w') as file:
file.write(content)
def write_json_file(content, filepath):
with open(filepath, 'w') as file:
json.dump(content, file, indent=2)
def read_json_file(filepath):
with open(filepath, 'r') as file:
return json.load(file)
# return a list of vectors
def embedding(content, engine='text-embedding-ada-002'):
response = openai.Embedding.create(input=content, engine=engine)
vector = response['data'][0]['embedding']
return vector

@ -13,4 +13,5 @@ def create():
{'note_dir': '', 'delimiter': '', 'append_mode': False, 'force_mode': False})
print(f'brain memo file created: {user_dir}/brain-memo.json')
create()

@ -0,0 +1,31 @@
import streamlit as st
import time
import modules.utilities as util
st.set_page_config(
page_title='GPT Brain'
)
# path
USER_DIR = '.user'
LOG_PATH = '.user/log'
BRAIN_MEMO = '.user/brain-memo.json'
MANIFEST = '.core/manifest.json'
INIT_LANGUAGE = '.user/language.json'
# activate session
if 'SESSION_TIME' not in st.session_state:
st.session_state['SESSION_TIME'] = time.strftime("%Y%m%d-%H%H%S")
if 'SESSION_LANGUAGE' not in st.session_state:
st.session_state['SESSION_LANGUAGE'] = util.read_json_at(INIT_LANGUAGE, 'SESSION_LANGUAGE')
if 'FILTER_ROW_COUNT' not in st.session_state:
st.session_state['FILTER_ROW_COUNT'] = util.read_json_at(BRAIN_MEMO, 'filter_row_count')
SESSION_TIME = st.session_state['SESSION_TIME']
CURRENT_LOG_FILE = f'{LOG_PATH}/log_{SESSION_TIME}.log'
# models
MODELS_OPTIONS = ['text-davinci-003', 'text-curie-001', 'text-babbage-001', 'text-ada-001']

@ -0,0 +1,4 @@
from modules import language
from modules import utilities
from modules import check_update
from modules import INFO

@ -1,6 +1,6 @@
import os
import time
from modules import utilities as util
import modules.utilities as util
file_path = r'.user\input.txt'
temp_file = r'.user\input_last-run.temp'

@ -2,6 +2,7 @@ import gettext
import streamlit as st
import modules.utilities as util
languages = util.read_json('.locals/languages.json')

@ -3,13 +3,6 @@ import os
import glob
# def extract_string(text, delimiter):
# # Extract string between delimiters
# start_index = text.index(delimiter) + len(delimiter)
# end_index = text.index(delimiter, start_index)
# return text[start_index:end_index]
def extract_string(text, delimiter, force=False, join=True, split_mode=False):
# Check if delimiter is not in text
if delimiter not in text:
@ -154,13 +147,23 @@ def update_json(filepath, key, value):
write_json(data, filepath)
def contains(list, item):
result = list.count(item)
def contains(ls: list, item):
result = ls.count(item)
return result > 0
def get_index(list, item, default=0) -> int:
def get_index(ls: list, item, default=0) -> int:
try:
return list.index(item)
return ls.index(item)
except ValueError:
return default
def extract_frontmatter(content, delimiter='---'):
# extract metadata
try:
yaml = extract_string(content, delimiter, True, join=False, split_mode=True)[1]
except IndexError:
yaml = ''
fields = yaml.split('\n')
return fields

@ -1,21 +1,15 @@
import time
import os
import streamlit as st
import streamlit_toggle as st_toggle
import os
from modules import utilities as util
import tkinter as tk
from tkinter import filedialog
from modules import language
import modules.language as language
import modules.utilities as util
import modules.INFO as INFO
import streamlit_toolkit.tools as st_tools
user_dir = '.user/'
SESSION_LANG = st.session_state['SESSION_LANGUAGE']
prompt_dir = f'{user_dir}prompt/{SESSION_LANG}/'
brain_memo = f'{user_dir}brain-memo.json'
if 'FILTER_ROW_COUNT' not in st.session_state:
st.session_state['FILTER_ROW_COUNT'] = util.read_json_at(brain_memo, 'filter_row_count')
PROMPT_PATH = f'{INFO.USER_DIR}/prompt/{SESSION_LANG}/'
_ = language.set_language()
@ -26,177 +20,6 @@ st.set_page_config(
body = st.container()
def save(content, path, page='', json_value: dict = None):
if json_value is None:
json_value = []
save_but = st.button(_('💾Save'))
if save_but:
util.write_file(content, path)
st.success(_('✅File saved!'))
# write to json file
if page == '💽Brain Memory':
util.update_json(brain_memo, 'delimiter', json_value['delimiter'])
util.update_json(brain_memo, 'append_mode', json_value['append_mode'])
util.update_json(brain_memo, 'force_mode', json_value['force_mode'])
util.update_json(brain_memo, 'advanced_mode', json_value['advanced_mode'])
util.update_json(brain_memo, 'filter_info', json_value['filter_info'])
util.update_json(brain_memo, 'filter_row_count', json_value['filter_row_count'])
time.sleep(1)
# refresh page
st.experimental_rerun()
def select_directory():
root = tk.Tk()
root.withdraw()
# make sure the dialog is on top of the main window
root.attributes('-topmost', True)
directory = filedialog.askdirectory(initialdir=os.getcwd(), title=_('Select Note Directory'), master=root)
return directory
def match_logic(operator, filter_val, value):
if operator == 'IS':
return filter_val == value
elif operator == 'IS NOT':
return filter_val != value
elif operator == 'CONTAINS':
return filter_val in value
elif operator == 'NOT CONTAINS':
return filter_val not in value
elif operator == 'MORE THAN':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) < float(value)
elif operator == 'LESS THAN':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) > float(value)
elif operator == 'MORE THAN OR EQUAL':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) <= float(value)
elif operator == 'LESS THAN OR EQUAL':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) >= float(value)
else:
return False
def extract_frontmatter(content, delimiter='---'):
# extract metadata
try:
yaml = util.extract_string(content, delimiter, True, join=False, split_mode=True)[1]
except IndexError:
yaml = ''
fields = yaml.split('\n')
return fields
def match_fields(pages: list, filter_datas: list[dict]):
filtered_contents = []
for page in pages:
fields = extract_frontmatter(page, delimiter='---')
found_data = []
for field in fields:
if field == '':
continue
found_key, found_value = field.split(':')
found_key = found_key.strip()
found_value = found_value.strip()
found_data.append({
'key': found_key,
'value': found_value
})
found_match = []
for data in filter_datas:
for found in found_data:
data_key = data['key'].lower()
data_val = data['value'].lower()
found_key = found['key'].lower()
found_val = found['value'].lower()
if data_key == found_key:
if match_logic(data['logic'], data_val, found_val):
# found single match
found_match.append(True)
# if all match
if found_match.count(True) == len(filter_datas):
filtered_contents.append(page)
combined_contents = '\n\n\n\n'.join(filtered_contents)
return combined_contents
def add_filter(num, val_filter_key, val_filter_logic, val_filter_val):
# filters
col1, col2, col3 = st.columns(3)
with col1:
filter_key = st.text_input(f'Key{num}', placeholder='Key', value=val_filter_key)
with col2:
options = ['CONTAINS',
'NOT CONTAINS',
'IS',
'IS NOT',
'MORE THAN',
'LESS THAN',
'MORE THAN OR EQUAL',
'LESS THAN OR EQUAL']
default_index = util.get_index(options, val_filter_logic, 0)
logic_select = st.selectbox(f'Logic{num}', options, index=default_index)
with col3:
if isinstance(val_filter_val, int):
val_filter_val = "{:02}".format(val_filter_val)
filter_val = st.text_input(f'value{num}', placeholder='Value', value=val_filter_val)
return filter_key, logic_select, filter_val
def filter_data(pages: list, add_filter_button, del_filter_button):
init_filter_infos = util.read_json_at(brain_memo, 'filter_info')
filter_datas = []
if add_filter_button:
st.session_state['FILTER_ROW_COUNT'] += 1
if del_filter_button:
st.session_state['FILTER_ROW_COUNT'] -= 1
if st.session_state['FILTER_ROW_COUNT'] >= 1:
for i in range(st.session_state['FILTER_ROW_COUNT'] + 1):
try:
init_info = init_filter_infos[i - 1]
init_key = init_info['key']
init_logic = init_info['logic']
init_val = init_info['value']
except IndexError:
init_key = ''
init_logic = 'CONTAINS'
init_val = ''
except KeyError:
init_key = ''
init_logic = 'CONTAINS'
init_val = ''
if i == 0:
continue
# add filter
filter_key, logic_select, filter_val = add_filter(i, init_key, init_logic, init_val)
data = {'key': filter_key, 'logic': logic_select, 'value': filter_val}
filter_datas.append(data)
# filter data
filtered_contents = match_fields(pages, filter_datas)
return filtered_contents, filter_datas
def main():
with st.sidebar:
st.title(_('Settings'))
@ -212,10 +35,10 @@ def main():
st.text(_('Configuration of prompts.'))
# read selected file
last_sel_file = util.read_json_at(brain_memo, 'selected_prompt')
all_files = os.listdir(prompt_dir)
last_sel_file = util.read_json_at(INFO.BRAIN_MEMO, 'selected_prompt')
all_files = os.listdir(PROMPT_PATH)
# sort files base on creation time
all_files.sort(key=lambda x: os.path.getmtime(f'{prompt_dir}{x}'), reverse=True)
all_files.sort(key=lambda x: os.path.getmtime(f'{PROMPT_PATH}{x}'), reverse=True)
# index of last selected file
try:
@ -230,9 +53,9 @@ def main():
if st_toggle.st_toggle_switch(_('New Prompt'), label_after=True):
new_file = st.text_input(_('New Prompt Name'), value=_('new_prompt'))
if st.button(_('Create')):
util.write_file('', f'{prompt_dir}{new_file}.txt')
util.write_file('', f'{PROMPT_PATH}{new_file}.txt')
# change select file to new fie
util.update_json(brain_memo, 'selected_prompt', selected_file)
util.update_json(INFO.BRAIN_MEMO, 'selected_prompt', selected_file)
# refresh page
st.experimental_rerun()
with col2:
@ -242,42 +65,42 @@ def main():
if not is_core:
if st_toggle.st_toggle_switch(_('Delete Prompt'), label_after=True):
if st.button(_('❌Delete')):
util.delete_file(f'{prompt_dir}{selected_file}')
util.delete_file(f'{PROMPT_PATH}{selected_file}')
# refresh page
st.experimental_rerun()
selected_path = prompt_dir + selected_file
selected_path = PROMPT_PATH + selected_file
mod_text = st.text_area(_('Prompts'), value=util.read_file(selected_path), height=500)
save(mod_text, selected_path)
st_tools.save(mod_text, selected_path)
if menu == _('💽Brain Memory'):
st.title(_('💽Brain Memory'))
st.text(_('Modify your brain knowledge base.'))
memory_data = util.read_file(f'{user_dir}input.txt')
memory_data = util.read_file(f'{INFO.USER_DIR}/input.txt')
col1, col2 = st.columns(2)
with col1:
st.button(_('🔄Refresh'))
with col2:
if st.button(_('📁Select Note Directory')):
note_dir = select_directory()
util.update_json(brain_memo, 'note_dir', note_dir)
note_dir = st.text_input(_('Note Directory'), value=util.read_json_at(brain_memo, 'note_dir'),
note_dir = st_tools.select_directory()
util.update_json(INFO.BRAIN_MEMO, 'note_dir', note_dir)
note_dir = st.text_input(_('Note Directory'), value=util.read_json_at(INFO.BRAIN_MEMO, 'note_dir'),
placeholder=_('Select Note Directory'), key='note_dir')
col1, col2, col3, col4 = st.columns([1, 2, 2, 2])
with col1:
delimiter_memo = util.read_json_at(brain_memo, 'delimiter')
delimiter_memo = util.read_json_at(INFO.BRAIN_MEMO, 'delimiter')
delimiter = st.text_input(_('Delimiter'), delimiter_memo, placeholder='e.g. +++')
with col2:
append_mode = st.checkbox(_('Append Mode'), value=util.read_json_at(brain_memo, 'append_mode'))
append_mode = st.checkbox(_('Append Mode'), value=util.read_json_at(INFO.BRAIN_MEMO, 'append_mode'))
force_delimiter = st.checkbox(_('Force Delimiter'),
value=util.read_json_at(brain_memo, 'force_mode'))
value=util.read_json_at(INFO.BRAIN_MEMO, 'force_mode'))
with col3:
advanced_mode = st_toggle.st_toggle_switch(_('Filter Mode'),
label_after=True,
default_value=util.read_json_at(brain_memo,
default_value=util.read_json_at(INFO.BRAIN_MEMO,
'advanced_mode', False))
with col4:
if advanced_mode:
@ -291,7 +114,7 @@ def main():
# if advanced mode enabled
if advanced_mode:
note_datas = util.read_files(note_dir, single_string=False)
note_datas, filter_info = filter_data(note_datas, add_filter_button, del_filter_button)
note_datas, filter_info = st_tools.filter_data(note_datas, add_filter_button, del_filter_button)
# note_datas, filter_key, filter_logic, filter_val = filter_data(note_datas, True)
modified_data = util.parse_data(note_datas, delimiter, force_delimiter)
else:
@ -303,7 +126,7 @@ def main():
memory_data = modified_data
mod_text = st.text_area(_('Raw Memory Inputs'), value=memory_data, height=500)
save(mod_text, f'{user_dir}input.txt', _('💽Brain Memory'), {
st_tools.save(mod_text, f'{INFO.USER_DIR}/input.txt', _('💽Brain Memory'), {
'delimiter': delimiter,
'append_mode': append_mode,
'force_mode': force_delimiter,
@ -315,8 +138,8 @@ def main():
if menu == _('🔑API Keys'):
st.title(_('🔑API Keys'))
st.text(_('Configure your OpenAI API keys.'))
mod_text = st.text_input(_('API Keys'), value=util.read_file(f'{user_dir}API-KEYS.txt'))
save(mod_text, f'{user_dir}API-KEYS.txt')
mod_text = st.text_input(_('API Keys'), value=util.read_file(f'{INFO.USER_DIR}/API-KEYS.txt'))
st_tools.save(mod_text, f'{INFO.USER_DIR}/API-KEYS.txt')
if __name__ == '__main__':

@ -0,0 +1 @@
from streamlit_toolkit import tools

@ -0,0 +1,286 @@
import os
import time
import streamlit as st
import tkinter as tk
from tkinter import filedialog
import modules.utilities as util
import modules.INFO as INFO
import modules as mod
import GPT
_ = mod.language.set_language()
def create_log():
if not os.path.exists(INFO.CURRENT_LOG_FILE):
util.write_file(f'Session {INFO.SESSION_TIME}\n\n', INFO.CURRENT_LOG_FILE)
return INFO.CURRENT_LOG_FILE
def log(content, delimiter=''):
log_file = create_log()
if delimiter != '':
delimiter = f'\n\n=============={delimiter}==============\n'
util.write_file(f'\n{delimiter + content}', log_file, 'a')
def clear_log():
log_file_name = f'log_{INFO.SESSION_TIME}.log'
for root, dirs, files in os.walk(INFO.LOG_PATH):
for file in files:
if not file == log_file_name:
os.remove(os.path.join(root, file))
def download_as():
# download log file
with open(INFO.CURRENT_LOG_FILE, 'rb') as f:
content = f.read()
st.download_button(
label=_("📥download log"),
data=content,
file_name=f'log_{INFO.SESSION_TIME}.txt',
mime='text/plain'
)
def save(content, path, page='', json_value: dict = None):
if json_value is None:
json_value = []
save_but = st.button(_('💾Save'))
if save_but:
util.write_file(content, path)
st.success(_('✅File saved!'))
# write to json file
if page == '💽Brain Memory':
util.update_json(INFO.BRAIN_MEMO, 'delimiter', json_value['delimiter'])
util.update_json(INFO.BRAIN_MEMO, 'append_mode', json_value['append_mode'])
util.update_json(INFO.BRAIN_MEMO, 'force_mode', json_value['force_mode'])
util.update_json(INFO.BRAIN_MEMO, 'advanced_mode', json_value['advanced_mode'])
util.update_json(INFO.BRAIN_MEMO, 'filter_info', json_value['filter_info'])
util.update_json(INFO.BRAIN_MEMO, 'filter_row_count', json_value['filter_row_count'])
time.sleep(1)
# refresh page
st.experimental_rerun()
def match_logic(operator, filter_val, value):
if operator == 'IS':
return filter_val == value
elif operator == 'IS NOT':
return filter_val != value
elif operator == 'CONTAINS':
return filter_val in value
elif operator == 'NOT CONTAINS':
return filter_val not in value
elif operator == 'MORE THAN':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) < float(value)
elif operator == 'LESS THAN':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) > float(value)
elif operator == 'MORE THAN OR EQUAL':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) <= float(value)
elif operator == 'LESS THAN OR EQUAL':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) >= float(value)
else:
return False
def select_directory():
root = tk.Tk()
root.withdraw()
# make sure the dialog is on top of the main window
root.attributes('-topmost', True)
directory = filedialog.askdirectory(initialdir=os.getcwd(), title=_('Select Note Directory'))
return directory
def match_fields(pages: list, filter_datas: list[dict]):
filtered_contents = []
for page in pages:
fields = util.extract_frontmatter(page, delimiter='---')
found_data = []
for field in fields:
if field == '':
continue
found_key, found_value = field.split(':')
found_key = found_key.strip()
found_value = found_value.strip()
found_data.append({
'key': found_key,
'value': found_value
})
found_match = []
for data in filter_datas:
for found in found_data:
data_key = data['key'].lower()
data_val = data['value'].lower()
found_key = found['key'].lower()
found_val = found['value'].lower()
if data_key == found_key:
if match_logic(data['logic'], data_val, found_val):
# found single match
found_match.append(True)
# if all match
if found_match.count(True) == len(filter_datas):
filtered_contents.append(page)
combined_contents = '\n\n\n\n'.join(filtered_contents)
return combined_contents
def add_filter(num, val_filter_key, val_filter_logic, val_filter_val):
# filters
col1, col2, col3 = st.columns(3)
with col1:
filter_key = st.text_input(f'Key{num}', placeholder='Key', value=val_filter_key)
with col2:
options = ['CONTAINS',
'NOT CONTAINS',
'IS',
'IS NOT',
'MORE THAN',
'LESS THAN',
'MORE THAN OR EQUAL',
'LESS THAN OR EQUAL']
default_index = util.get_index(options, val_filter_logic, 0)
logic_select = st.selectbox(f'Logic{num}', options, index=default_index)
with col3:
if isinstance(val_filter_val, int):
val_filter_val = "{:02}".format(val_filter_val)
filter_val = st.text_input(f'value{num}', placeholder='Value', value=val_filter_val)
return filter_key, logic_select, filter_val
def filter_data(pages: list, add_filter_button, del_filter_button):
init_filter_infos = util.read_json_at(INFO.BRAIN_MEMO, 'filter_info')
filter_datas = []
if add_filter_button:
st.session_state['FILTER_ROW_COUNT'] += 1
if del_filter_button:
st.session_state['FILTER_ROW_COUNT'] -= 1
if st.session_state['FILTER_ROW_COUNT'] >= 1:
for i in range(st.session_state['FILTER_ROW_COUNT'] + 1):
try:
init_info = init_filter_infos[i - 1]
init_key = init_info['key']
init_logic = init_info['logic']
init_val = init_info['value']
except IndexError:
init_key = ''
init_logic = 'CONTAINS'
init_val = ''
except KeyError:
init_key = ''
init_logic = 'CONTAINS'
init_val = ''
if i == 0:
continue
# add filter
filter_key, logic_select, filter_val = add_filter(i, init_key, init_logic, init_val)
data = {'key': filter_key, 'logic': logic_select, 'value': filter_val}
filter_datas.append(data)
# filter data
filtered_contents = match_fields(pages, filter_datas)
return filtered_contents, filter_datas
def process_response(query, target_model, prompt_file: str, data: GPT.model.param):
# check if exclude model is not target model
file_name = util.get_file_name(prompt_file)
with st.spinner(_('Thinking on ') + f"{file_name}..."):
results = GPT.query.run(query, target_model, prompt_file,
data.temp,
data.max_tokens,
data.top_p,
data.frequency_penalty,
data.present_penalty)
# displaying results
st.header(f'📃{file_name}')
st.info(f'{results}')
time.sleep(1)
log(results, delimiter=f'{file_name.upper()}')
def execute_brain(q, params: GPT.model.param,
op: GPT.model.Operation,
model: GPT.model.Model,
prompt_dictionary: dict,
session_language):
# log question
log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}')
if mod.check_update.isUpdated():
st.success(_('Building Brain...'))
# if brain-info is updated
GPT.query.build(params.chunk_size)
st.success(_('Brain rebuild!'))
time.sleep(2)
# thinking on answer
with st.spinner(_('Thinking on Answer')):
answer = GPT.query.run_answer(q, model.question_model,
params.temp,
params.max_tokens,
params.top_p,
params.frequency_penalty,
params.present_penalty,
chunk_count=params.chunk_count)
if util.contains(op.operations, _('question')):
# displaying results
st.header(_('💬Answer'))
st.info(f'{answer}')
time.sleep(1)
log(answer, delimiter='ANSWER')
# thinking on other outputs
if len(op.operations_no_question) > 0:
for i in range(len(op.operations_no_question)):
prompt_path = prompt_dictionary[op.operations_no_question[i]]
other_model = model.other_models[i]
process_response(answer, other_model, prompt_path, params)
# convert param to dictionary
param_dict = vars(params)
# write param to json
for key in param_dict:
value = param_dict[key]
util.update_json(INFO.BRAIN_MEMO, key, value)
# write operation to json
util.update_json(INFO.BRAIN_MEMO, f'operations_{session_language}', op.operations)
# write question model to json
util.update_json(INFO.BRAIN_MEMO, 'question_model', model.question_model)
# write other models to json
for i in range(len(op.operations_no_question)):
util.update_json(INFO.BRAIN_MEMO, f'{op.operations_no_question[i]}_model', model.other_models[i])
def message(msg, condition=None):
if condition is not None:
if condition:
st.warning("⚠️" + msg)
else:
st.warning("⚠️" + msg)
Loading…
Cancel
Save