Building a CRUD script using Python and SQLite
by bfarrell on Apr 14, 2025

I built a CRUD script with Python and SQLite as my CS50P final project.
Functionality
-
Create and initialize a SQLite database
-
Get user input and run actions such as:
- Adding, viewing, updating, and deleting data
- Searching data
- Exporting data as an
HTML
table - Clearing input history
- Canceling actions after starting one
-
Testing using
pytest
. View tests.
Modules and packages
-
sqlite3
-
os
-
time
- Using
time.sleep()
to provide a brief delay before running certain actions
- Using
-
webbrowser
- Allowing the user to open the
HTML
output in their browser
- Allowing the user to open the
-
re
- Validating date input
-
platform
- Detecting the platform using
platform.system()
to ensure theHTML
opens correctly (e.g. usingfile:///path/to/file.html
on macOS) and running the correctos.system
clear command
- Detecting the platform using
-
tabulate
- Formatting the
db
data before printing and exporting it as anHTML
table
- Formatting the
The script
import sqlite3, os, time, webbrowser, re, platform
from tabulate import tabulate
def connect_db():
"""
Return db connection
"""
return sqlite3.connect("db.sqlite3")
def add_data(item):
"""
Accepts a tuple or list
Adds item to db
"""
with connect_db() as con:
cur = con.cursor()
cur.executemany("INSERT INTO items (name, purchase_date, purchase_price, purchase_location) VALUES (?, ?, ?, ?)", [item])
con.commit()
print("Item added!")
def delete_data(id):
"""
Accepts id as an int
Deletes specified item from db if it exists
"""
try:
with connect_db() as con:
cur = con.cursor()
cur.execute("SELECT * FROM items WHERE rowid = ?", (id,))
row = cur.fetchone() or None
if not row == None:
cur.execute(f"DELETE FROM items WHERE rowid={id}")
con.commit()
print("Item deleted")
else:
print("ID not found")
except sqlite3.OperationalError as e:
print(e)
def update_data(id):
"""
Accepts id as an int
Allows updating of item in db if it exists
"""
try:
with connect_db() as con:
cur = con.cursor()
data = cur.execute("SELECT * FROM items WHERE rowid = ?", (id,))
if data != None:
data_list = format_data_as_list(data)
print("You are updating:")
print(format_rows_as_table(data_list))
item = get_item_input("updating")
if len(item) == 4:
try:
update_statement = "UPDATE items SET name=?, purchase_date=?, purchase_price=?, purchase_location=? WHERE rowid = ?"
name, date, price, location = item[0], item[1], item[2], item[3]
cur.execute(update_statement, (name, date, price, location, id))
print("Item updated:")
data = cur.execute("SELECT * FROM items WHERE rowid = ?", (id,))
data_list = format_data_as_list(data)
print(format_rows_as_table(data_list))
except sqlite3.OperationalError as e:
print(e)
else:
print("ID not found")
except sqlite3.OperationalError as e:
print(e)
def format_rows_as_table(list):
"""
Return data as formatted table using tabulate
"""
return tabulate(list, headers=["ID", "Item name", "Date", "Price", "Purchase Location"], tablefmt="rounded_grid")
def format_data_as_list(data):
"""
Returns db data in list
"""
list = []
for row in data:
id, name, date, price, location = row[0], row[1], row[2], row[3], row[4]
row = [id, name, date, price, location]
list.append(row)
return list
def view_data():
"""
Prints db data in table
"""
with connect_db() as con:
try:
cur = con.cursor()
data = cur.execute("SELECT * FROM items")
if data != None:
data_list = format_data_as_list(data)
print(format_rows_as_table(data_list))
else:
print("No items in db")
except sqlite3.OperationalError as e:
print(e)
def export_data():
"""
Formats db data as HTML
"""
with connect_db() as con:
try:
cur = con.cursor()
data = cur.execute("SELECT * FROM items")
if data != None:
table = format_data_as_list(data)
html = tabulate(table, headers=["ID", "Item name", "Date", "Price", "Purchase Location"], tablefmt="html")
create_html(html)
else:
print("No items in db")
except sqlite3.OperationalError as e:
print(e)
def create_html(html):
"""
Writes the formatted HTML data from export_data to the output file
Prompts user whether they would like to open output file
"""
html = html.replace("<table>", "<table class='table'>")
with open("output.html", "w") as file:
file.write("<html>")
file.write("<head>")
file.write('<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">')
file.write("<title>Exported Data</title>")
file.write("</head>")
file.write("<body>")
file.write(html)
file.write("</body>")
file.write("</html>")
while True:
open_file = input("output.html created. Open? (y/n) ").strip()
if open_file.lower() == "y":
try:
print("Opening...")
time.sleep(0.5)
plat = platform.system()
abs_path = os.path.abspath("output.html")
if plat in ("Linux", "Darwin"):
webbrowser.open(f"file://{abs_path}", new=0)
print(abs_path, "Mac or Linux")
elif plat == "Windows":
webbrowser.open(abs_path, new=0)
print(abs_path, "Windows")
break
except webbrowser.Error as e:
print(e)
elif open_file.lower() == "n":
break
else:
print("Invalid input. Enter (y/n)")
def clear_history():
"""
Clears user input history
"""
print("Clearing...")
plat = platform.system()
time.sleep(0.5)
if plat in ("Linux", "Darwin"):
os.system("clear")
elif plat == "Windows":
os.system("cls")
def validate_date(str):
"""
Return True if date matches regex for YYYY-MM-DD format
"""
return bool(re.match(r"^(\d{4})-(0[1-9]|1[0-2]|[1-9])-([1-9]|0[1-9]|[1-2]\d|3[0-1])$", str))
def search(str):
"""
Finds item in db in which the name contains a match to the str param
"""
with connect_db() as con:
try:
cur = con.cursor()
query = "SELECT * FROM items WHERE name LIKE CONCAT('%', ?, '%')"
cur.execute(query, (str,))
rows = cur.fetchall()
if rows:
print("Found:")
table = []
for row in rows:
id, name, date, price, location = row[0], row[1], row[2], row[3], row[4]
row = [id, name, date, price, location]
table.append(row)
print(format_rows_as_table(table))
else:
print(f"No items found for '{str}'")
except sqlite3.OperationalError as e:
print(e)
def get_item_input(action):
"""
Handle getting user input for item
Returns list with validated item input
Used when adding or updating an item
"""
canceled = False
item = []
fields = ["name", "date", "price", "location"]
print("(c) to cancel operation")
for field in fields:
while True:
if field == "name" or field == "location":
input_val = input(f"{field}: ").strip().lower()
if input_val == "c":
canceled = True
break
elif input_val:
item.append(input_val)
break
else:
print(f"{field} required")
if field == "date":
input_val = input(f"{field}: ").strip()
if input_val == "c":
canceled = True
break
elif validate_date(input_val):
item.append(input_val)
break
else:
print(f"{field} required. YYYY-MM-DD")
if field == "price":
input_val = input(f"{field}: ").strip()
if input_val == "c":
canceled = True
break
elif input_val:
try:
item.append(float(input_val))
break
except ValueError:
print(f"{field} required. Enter a number, e.g. 120 or 120.25")
if canceled:
print(f"Canceled {action} item.")
break
if input_val != "e" and len(item) == 4:
return item
def get_input():
"""
Handles getting user input and calling CRUD functions
"""
while True:
print("(v)iew, (a)dd, (d)elete, (u)pdate, (s)earch, (cl)ear, (ex)port or (e)xit")
action = input("What would you like to do? ").strip().lower()
if action == "v":
view_data()
elif action == "a":
item = get_item_input("adding")
if item:
add_data(item)
elif action == "d":
print("(c) to cancel operation")
input_val = input("ID to delete: ").strip()
if input_val == "c":
print("Canceled operation")
continue
elif input_val:
try:
id = int(input_val)
delete_data(id)
except ValueError:
print("ID must be a number")
else:
print("ID is required")
elif action == "u":
print("(c) to cancel operation")
input_val = input("ID to update: ").strip()
if input_val == "c":
print("Canceled operation")
continue
elif input_val:
try:
id = int(input_val)
update_data(id)
except ValueError:
print("ID must be a number")
else:
print("ID is required")
elif action == "s":
print("(c) to cancel operation")
input_val = input("Search for item name: ").strip()
if input_val == "c":
print("Canceled operation")
continue
elif input_val:
search(input_val)
else:
print("Search term required")
elif action == "cl":
clear_history()
elif action == "ex":
export_data()
elif action == "e":
print("Exiting...")
break
else:
print("Invalid input")
def main():
with connect_db() as con:
try:
cur = con.cursor()
cur.execute('''
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
purchase_date TEXT,
purchase_price TEXT,
purchase_location TEXT
)
''')
get_input()
except sqlite3.OperationalError as e:
print(e)
if __name__ == "__main__":
main()
Tests
from project import validate_date
def test_validate_date():
assert validate_date("2025-05-12") == True
assert validate_date("2025-6-12") == True
assert validate_date("2025-06-1") == True
assert validate_date("2025-6-01") == True
assert validate_date("2025-05-32") == False
assert validate_date("2025-13-12") == False
assert validate_date("2025/06/12") == False
assert validate_date("April 14, 2025") == False
assert validate_date("") == False