57 lines
1.4 KiB
Python
57 lines
1.4 KiB
Python
import os.path
|
|
|
|
from fastapi import FastAPI, APIRouter
|
|
from pydantic.types import UUID4
|
|
|
|
from memories.collection.poc import get_files
|
|
from memories.core.db import Item, database
|
|
|
|
app = FastAPI(title="Memories")
|
|
# database.connect()
|
|
# database.create_tables([Item])
|
|
# database.close()
|
|
|
|
router_items = APIRouter()
|
|
|
|
|
|
@app.get("/healthz")
|
|
async def healthz():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@router_items.get("/")
|
|
async def items_list():
|
|
return []
|
|
|
|
|
|
@router_items.post("/")
|
|
async def item_create():
|
|
return {}
|
|
|
|
|
|
@router_items.get("/{item_id}")
|
|
async def item_detail(item_id: UUID4):
|
|
return {"item_id": item_id}
|
|
|
|
app.include_router(router_items, prefix="/items", tags=["items"])
|
|
|
|
@app.get("/test/sync")
|
|
async def test_sync_view():
|
|
with database.atomic():
|
|
for item in Item.select():
|
|
if not os.path.exists(item.file_path):
|
|
item.delete()
|
|
|
|
for file in get_files():
|
|
if not Item.select().where(Item.file_path == file.path).exists():
|
|
Item.create(
|
|
file_path=file.path,
|
|
kind=Item.VIDEO if file.is_video else Item.PICTURE,
|
|
metadata={"raw": file.is_raw, "panorama": False},
|
|
creation_date=file.datetime,
|
|
checksum=file.checksum,
|
|
exif=file.exif,
|
|
stat=file.stat,
|
|
mimetype=file.mimetype,
|
|
)
|