- TypeScript types + client SDK layout (matches the OpenAPI you’ve got)
- Direct-to-storage upload helper (signed URL flow)
- Job processor spec (queue, retries, idempotency, chaining, debouncing, concurrency)
- Worker pseudocode (claim jobs safely, at-least-once without chaos)
Everything below is designed so a small team can ship V1 without drowning.
1) TypeScript Frontend Contract
1.1 Folder layout for the SDK
kilo-sdk/
src/
index.ts
client.ts
types.ts
errors.ts
pagination.ts
realtime.ts
uploads.ts
package.json
tsconfig.json
README.md
types.ts= request/response typesclient.ts= fetch wrapper + endpointsuploads.ts= signed upload helper (PUT with headers)realtime.ts= SSE wrappererrors.ts= typed API errorspagination.ts= cursor helpers
1.2 types.ts (core types)
// src/types.ts
export type UUID = string;
export type ISODateTime = string; // "2026-01-14T12:34:56Z"
export type ISODate = string; // "2026-01-14"
export type Cursor = string;
export interface ApiError {
code: string;
message: string;
details?: Record<string, unknown>;
requestId?: string;
}
export interface PageInfo {
nextCursor?: Cursor | null;
}
export interface User {
id: UUID;
email: string;
displayName: string;
createdAt: ISODateTime;
}
export type ProjectStatus = "active" | "archived" | "deleted";
export interface Project {
id: UUID;
studioId?: UUID | null;
ownerUserId: UUID;
title: string;
description?: string | null;
shootDate?: ISODate | null;
timezone?: string | null;
status: ProjectStatus;
stats?: Record<string, unknown>;
createdAt: ISODateTime;
updatedAt: ISODateTime;
}
export interface Paged<T> {
items: T[];
pageInfo: PageInfo;
}
export interface CreateProjectRequest {
title: string;
description?: string;
shootDate?: ISODate;
timezone?: string;
}
export interface UpdateProjectRequest {
title?: string;
description?: string | null;
shootDate?: ISODate | null;
timezone?: string | null;
status?: ProjectStatus;
}
export interface LoginStartRequest {
email: string;
locale?: string;
}
export interface LoginStartResponse {
challengeId: string;
delivery: "email_code" | "magic_link";
}
export interface LoginVerifyRequest {
challengeId: string;
code: string;
}
export interface LoginVerifyResponse {
accessToken: string;
refreshToken: string;
user: User;
}
// Assets
export interface Rating {
assetId: UUID;
userId: UUID;
rating?: number | null; // 1-5
picked?: boolean | null;
rejected?: boolean | null;
starred?: boolean | null;
notes?: string | null;
updatedAt?: ISODateTime | null;
}
export interface Asset {
id: UUID;
projectId: UUID;
capturedAt?: ISODateTime | null;
ingestedAt: ISODateTime;
widthPx?: number | null;
heightPx?: number | null;
cameraMake?: string | null;
cameraModel?: string | null;
lensModel?: string | null;
focalLengthMm?: number | null;
shutterSpeed?: string | null;
aperture?: number | null;
iso?: number | null;
exif?: Record<string, unknown>;
iptc?: Record<string, unknown>;
flags?: Record<string, unknown>;
myRating?: Rating;
bestPreviewUrl?: string | null;
}
export interface UpdateAssetRequest {
iptc?: Record<string, unknown>;
keywords?: string[];
notes?: string | null;
flags?: Record<string, unknown>;
}
export type AssetFileKind = "original" | "thumbnail" | "preview" | "export";
export interface AssetFile {
kind: AssetFileKind;
url: string;
contentType?: string | null;
byteSize?: number | null;
checksumSha256?: string | null;
expiresAt?: ISODateTime | null;
}
export interface AssetFilesResponse {
files: AssetFile[];
}
export interface UpsertRatingRequest {
rating?: number | null; // 1-5
picked?: boolean | null;
rejected?: boolean | null;
starred?: boolean | null;
notes?: string | null;
}
// Upload
export interface UploadFileDescriptor {
clientFileId?: string | null;
filename: string;
byteSize: number;
contentType?: string | null;
capturedAt?: ISODateTime | null;
}
export interface PrepareUploadRequest {
files: UploadFileDescriptor[];
}
export interface UploadInstruction {
clientFileId?: string | null;
assetId: UUID;
uploadUrl: string;
headers?: Record<string, string>;
expiresAt?: ISODateTime | null;
}
export interface PrepareUploadResponse {
uploads: UploadInstruction[];
}
export interface FinalizeUploadRequest {
assets: Array<{
assetId: UUID;
checksumSha256: string;
contentType?: string | null;
}>;
}
export interface FinalizeUploadResponse {
queuedJobs: UUID[];
}
// Clusters
export type ClusterKind = "moment" | "burst" | "duplicate_group";
export interface ClusterAsset {
assetId: UUID;
rank: number; // 1..N
role?: "candidate" | "winner" | "alt" | null;
signals?: Record<string, unknown>;
}
export interface Cluster {
id: UUID;
projectId: UUID;
kind: ClusterKind;
title?: string | null;
startTime?: ISODateTime | null;
endTime?: ISODateTime | null;
score?: number | null;
reviewed?: boolean | null;
winnerAssetId?: UUID | null;
whyWinner?: string[];
assets: ClusterAsset[];
}
export interface UpdateClusterRequest {
title?: string | null;
reviewed?: boolean | null;
manualOverrides?: Record<string, unknown>;
}
export interface SetWinnerRequest {
winnerAssetId: UUID;
}
// Bulk cull
export interface BulkCullActionRequest {
actions: Array<{
assetId: UUID;
picked?: boolean | null;
rejected?: boolean | null;
rating?: number | null; // 1-5
starred?: boolean | null;
}>;
}
export interface BulkCullActionResponse {
ok: boolean;
updatedAssetIds?: UUID[];
}
// Edits
export interface EditVersion {
id: UUID;
assetId: UUID;
userId: UUID;
parentId?: UUID | null;
name?: string | null;
params: Record<string, unknown>;
createdAt: ISODateTime;
}
export interface EditVersionList {
items: EditVersion[];
}
export interface CreateEditVersionRequest {
name?: string | null;
parentId?: UUID | null;
params: Record<string, unknown>;
}
export interface ApplyEditBatchRequest {
assetIds: UUID[];
mode?: "create_versions" | "overwrite_latest";
name?: string | null;
}
export interface ApplyEditBatchResponse {
jobId: UUID;
status: "queued" | "running" | "done";
}
// Search
export interface SearchResponse {
results: Array<{
assetId: UUID;
score: number;
highlights?: string[];
}>;
tookMs?: number;
}
export interface AdvancedSearchRequest {
q: string;
filters?: Record<string, unknown>;
}
// Galleries
export interface Gallery {
id: UUID;
projectId: UUID;
title: string;
shareSlug: string;
expiresAt?: ISODateTime | null;
watermark: boolean;
allowDownloads: boolean;
requiresPassword: boolean;
assetCount: number;
createdAt: ISODateTime;
}
export interface CreateGalleryRequest {
title: string;
assetIds: UUID[];
password?: string | null;
expiresAt?: ISODateTime | null;
watermark?: boolean;
allowDownloads?: boolean;
}
export interface UpdateGalleryRequest {
title?: string;
password?: string | null;
expiresAt?: ISODateTime | null;
watermark?: boolean;
allowDownloads?: boolean;
}
export interface AddGalleryAssetsRequest {
assetIds: UUID[];
}
export interface RemoveGalleryAssetsRequest {
assetIds: UUID[];
}
export interface PublicGalleryResponse {
title: string;
requiresAuth: boolean;
token?: string | null;
assets?: Array<{
assetId: UUID;
previewUrl: string;
favoriteCount?: number | null;
commentsCount?: number | null;
}>;
}
export interface PublicGalleryAuthRequest {
password: string;
}
export interface PublicGalleryAuthResponse {
token: string;
}
export interface PublicFavoriteRequest {
assetId: UUID;
favorite: boolean;
}
export interface PublicFavoriteResponse {
ok: boolean;
}
export interface PublicCommentRequest {
assetId: UUID;
text: string;
}
export interface PublicCommentResponse {
commentId: UUID;
}
// Exports
export type ExportStatus = "queued" | "running" | "done" | "failed";
export type ExportPreset = "full_res" | "web" | "instagram_carousel" | "story_9x16" | "contact_sheet_pdf";
export interface Export {
id: UUID;
projectId: UUID;
preset: ExportPreset;
settings?: Record<string, unknown>;
status: ExportStatus;
progress?: number | null;
createdAt: ISODateTime;
}
export interface CreateExportRequest {
preset: ExportPreset;
assetIds?: UUID[] | null;
settings?: Record<string, unknown>;
}
export interface ExportDownloadResponse {
url: string;
expiresAt?: ISODateTime | null;
}
// Jobs
export type JobStatus = "queued" | "running" | "done" | "failed" | "canceled";
export interface Job {
id: UUID;
projectId?: UUID | null;
assetId?: UUID | null;
type: string;
status: JobStatus;
progress?: number | null;
error?: string | null;
createdAt: ISODateTime;
updatedAt: ISODateTime;
}
1.3 errors.ts (clean error handling)
// src/errors.ts
import type { ApiError } from "./types";
export class KiloApiError extends Error {
public readonly status: number;
public readonly body?: ApiError;
constructor(status: number, body?: ApiError) {
super(body?.message ?? `API Error (${status})`);
this.name = "KiloApiError";
this.status = status;
this.body = body;
}
}
export function isKiloApiError(e: unknown): e is KiloApiError {
return e instanceof KiloApiError;
}
1.4 client.ts (typed API client)
// src/client.ts
import { KiloApiError } from "./errors";
import type {
LoginStartRequest, LoginStartResponse,
LoginVerifyRequest, LoginVerifyResponse,
CreateProjectRequest, UpdateProjectRequest,
Project, Asset, AssetFilesResponse, Rating,
PrepareUploadRequest, PrepareUploadResponse,
FinalizeUploadRequest, FinalizeUploadResponse,
Cluster, UpdateClusterRequest,
SetWinnerRequest,
BulkCullActionRequest, BulkCullActionResponse,
EditVersionList, CreateEditVersionRequest, EditVersion,
ApplyEditBatchRequest, ApplyEditBatchResponse,
SearchResponse, AdvancedSearchRequest,
CreateGalleryRequest, Gallery, UpdateGalleryRequest,
AddGalleryAssetsRequest, RemoveGalleryAssetsRequest,
PublicGalleryResponse, PublicGalleryAuthRequest, PublicGalleryAuthResponse,
PublicFavoriteRequest, PublicFavoriteResponse,
PublicCommentRequest, PublicCommentResponse,
CreateExportRequest, Export, ExportDownloadResponse,
Job, Paged, UUID
} from "./types";
export interface KiloClientConfig {
baseUrl: string; // e.g. "https://api.kilo.photo/v1"
getAccessToken?: () => string | null;
defaultHeaders?: Record<string, string>;
fetchImpl?: typeof fetch;
}
type RequestOpts = {
idempotencyKey?: string;
headers?: Record<string, string>;
signal?: AbortSignal;
};
export class KiloClient {
private baseUrl: string;
private getAccessToken?: () => string | null;
private defaultHeaders: Record<string, string>;
private fetchImpl: typeof fetch;
constructor(cfg: KiloClientConfig) {
this.baseUrl = cfg.baseUrl.replace(/\/+$/, "");
this.getAccessToken = cfg.getAccessToken;
this.defaultHeaders = cfg.defaultHeaders ?? {};
this.fetchImpl = cfg.fetchImpl ?? fetch;
}
private async request<T>(method: string, path: string, body?: unknown, opts?: RequestOpts): Promise<T> {
const url = `${this.baseUrl}${path}`;
const headers: Record<string, string> = {
"Content-Type": "application/json",
...this.defaultHeaders,
...(opts?.headers ?? {})
};
const token = this.getAccessToken?.();
if (token) headers["Authorization"] = `Bearer ${token}`;
if (opts?.idempotencyKey) headers["Idempotency-Key"] = opts.idempotencyKey;
const res = await this.fetchImpl(url, {
method,
headers,
body: body === undefined ? undefined : JSON.stringify(body),
signal: opts?.signal,
});
const text = await res.text();
const maybeJson = text ? safeJson(text) : undefined;
if (!res.ok) {
throw new KiloApiError(res.status, maybeJson);
}
return maybeJson as T;
}
// AUTH
loginStart(req: LoginStartRequest): Promise<LoginStartResponse> {
return this.request("POST", "/auth/login", req);
}
loginVerify(req: LoginVerifyRequest): Promise<LoginVerifyResponse> {
return this.request("POST", "/auth/verify", req);
}
// PROJECTS
listProjects(params?: { cursor?: string; limit?: number }): Promise<Paged<Project>> {
const q = qs(params);
return this.request("GET", `/projects${q}`);
}
createProject(req: CreateProjectRequest): Promise<Project> {
return this.request("POST", "/projects", req);
}
getProject(projectId: UUID): Promise<Project> {
return this.request("GET", `/projects/${projectId}`);
}
updateProject(projectId: UUID, req: UpdateProjectRequest): Promise<Project> {
return this.request("PATCH", `/projects/${projectId}`, req);
}
archiveProject(projectId: UUID): Promise<Project> {
return this.request("POST", `/projects/${projectId}/archive`);
}
// INGEST
prepareUpload(projectId: UUID, req: PrepareUploadRequest, opts?: RequestOpts): Promise<PrepareUploadResponse> {
return this.request("POST", `/projects/${projectId}/assets:prepareUpload`, req, opts);
}
finalizeUpload(projectId: UUID, req: FinalizeUploadRequest, opts?: RequestOpts): Promise<FinalizeUploadResponse> {
return this.request("POST", `/projects/${projectId}/assets:finalizeUpload`, req, opts);
}
// ASSETS
listAssets(projectId: UUID, params?: { cursor?: string; limit?: number; picked?: boolean; rejected?: boolean; ratingMin?: number }): Promise<Paged<Asset>> {
const q = qs(params);
return this.request("GET", `/projects/${projectId}/assets${q}`);
}
getAsset(assetId: UUID): Promise<Asset> {
return this.request("GET", `/assets/${assetId}`);
}
updateAsset(assetId: UUID, req: Record<string, unknown>): Promise<Asset> {
return this.request("PATCH", `/assets/${assetId}`, req);
}
getAssetFiles(assetId: UUID): Promise<AssetFilesResponse> {
return this.request("GET", `/assets/${assetId}/files`);
}
setRating(assetId: UUID, req: Record<string, unknown>): Promise<Rating> {
return this.request("POST", `/assets/${assetId}/ratings`, req);
}
// CLUSTERS
listClusters(projectId: UUID, params?: { kind?: string; cursor?: string; limit?: number }): Promise<Paged<Cluster>> {
const q = qs(params);
return this.request("GET", `/projects/${projectId}/clusters${q}`);
}
getCluster(clusterId: UUID): Promise<Cluster> {
return this.request("GET", `/clusters/${clusterId}`);
}
updateCluster(clusterId: UUID, req: UpdateClusterRequest): Promise<Cluster> {
return this.request("PATCH", `/clusters/${clusterId}`, req);
}
setWinner(clusterId: UUID, req: SetWinnerRequest): Promise<Cluster> {
return this.request("POST", `/clusters/${clusterId}/winner`, req);
}
// CULLING (bulk)
applyCullActions(projectId: UUID, req: BulkCullActionRequest, opts?: RequestOpts): Promise<BulkCullActionResponse> {
return this.request("POST", `/projects/${projectId}/cull:applyAction`, req, opts);
}
// EDITS
listEdits(assetId: UUID): Promise<EditVersionList> {
return this.request("GET", `/assets/${assetId}/edits`);
}
createEdit(assetId: UUID, req: CreateEditVersionRequest): Promise<EditVersion> {
return this.request("POST", `/assets/${assetId}/edits`, req);
}
getEdit(editId: UUID): Promise<EditVersion> {
return this.request("GET", `/edits/${editId}`);
}
applyEditBatch(editId: UUID, req: ApplyEditBatchRequest, opts?: RequestOpts): Promise<ApplyEditBatchResponse> {
return this.request("POST", `/edits/${editId}/applyTo`, req, opts);
}
// SEARCH
search(projectId: UUID, qText: string, limit?: number): Promise<SearchResponse> {
const q = qs({ q: qText, limit });
return this.request("GET", `/projects/${projectId}/search${q}`);
}
advancedSearch(projectId: UUID, req: AdvancedSearchRequest): Promise<SearchResponse> {
return this.request("POST", `/projects/${projectId}/search`, req);
}
// GALLERIES (owner/admin)
createGallery(projectId: UUID, req: CreateGalleryRequest, opts?: RequestOpts): Promise<Gallery> {
return this.request("POST", `/projects/${projectId}/galleries`, req, opts);
}
getGallery(galleryId: UUID): Promise<Gallery> {
return this.request("GET", `/galleries/${galleryId}`);
}
updateGallery(galleryId: UUID, req: UpdateGalleryRequest): Promise<Gallery> {
return this.request("PATCH", `/galleries/${galleryId}`, req);
}
addGalleryAssets(galleryId: UUID, req: AddGalleryAssetsRequest, opts?: RequestOpts): Promise<Gallery> {
return this.request("POST", `/galleries/${galleryId}/assets`, req, opts);
}
removeGalleryAssets(galleryId: UUID, req: RemoveGalleryAssetsRequest): Promise<Gallery> {
return this.request("DELETE", `/galleries/${galleryId}/assets`, req);
}
// PUBLIC SHARE (client)
publicGetGallery(shareSlug: string): Promise<PublicGalleryResponse> {
return this.request("GET", `/share/${encodeURIComponent(shareSlug)}`);
}
publicAuthGallery(shareSlug: string, req: PublicGalleryAuthRequest): Promise<PublicGalleryAuthResponse> {
return this.request("POST", `/share/${encodeURIComponent(shareSlug)}/auth`, req);
}
publicFavorite(shareSlug: string, req: PublicFavoriteRequest): Promise<PublicFavoriteResponse> {
return this.request("POST", `/share/${encodeURIComponent(shareSlug)}/favorite`, req);
}
publicComment(shareSlug: string, req: PublicCommentRequest): Promise<PublicCommentResponse> {
return this.request("POST", `/share/${encodeURIComponent(shareSlug)}/comment`, req);
}
// EXPORTS
createExport(projectId: UUID, req: CreateExportRequest, opts?: RequestOpts): Promise<Export> {
return this.request("POST", `/projects/${projectId}/exports`, req, opts);
}
getExport(exportId: UUID): Promise<Export> {
return this.request("GET", `/exports/${exportId}`);
}
exportDownload(exportId: UUID): Promise<ExportDownloadResponse> {
return this.request("GET", `/exports/${exportId}/download`);
}
// JOBS
listJobs(projectId: UUID, params?: { status?: string; type?: string; cursor?: string; limit?: number }): Promise<Paged<Job>> {
const q = qs(params);
return this.request("GET", `/projects/${projectId}/jobs${q}`);
}
}
// helpers
function safeJson(text: string): any {
try { return JSON.parse(text); } catch { return undefined; }
}
function qs(params?: Record<string, any>): string {
if (!params) return "";
const clean: Record<string, string> = {};
for (const [k, v] of Object.entries(params)) {
if (v === undefined || v === null) continue;
clean[k] = String(v);
}
const s = new URLSearchParams(clean).toString();
return s ? `?${s}` : "";
}
1.5 uploads.ts (signed URL direct upload helper)
// src/uploads.ts
import type { UploadInstruction, UUID } from "./types";
export interface UploadResult {
assetId: UUID;
ok: boolean;
status: number;
}
export async function uploadToSignedUrl(
instr: UploadInstruction,
file: Blob,
fetchImpl: typeof fetch = fetch
): Promise<UploadResult> {
const res = await fetchImpl(instr.uploadUrl, {
method: "PUT",
headers: {
...(instr.headers ?? {}),
// Some S3-style signed URLs require exact headers, so don't add extra unless needed.
},
body: file,
});
return { assetId: instr.assetId, ok: res.ok, status: res.status };
}
// Optional: concurrency-limited bulk uploader
export async function uploadMany(
items: Array<{ instr: UploadInstruction; file: Blob }>,
opts?: { concurrency?: number; fetchImpl?: typeof fetch; onProgress?: (done: number, total: number) => void }
): Promise<UploadResult[]> {
const concurrency = Math.max(1, opts?.concurrency ?? 4);
const fetchImpl = opts?.fetchImpl ?? fetch;
const total = items.length;
let done = 0;
const results: UploadResult[] = [];
const queue = items.slice();
const workers = Array.from({ length: concurrency }, async () => {
while (queue.length) {
const next = queue.shift();
if (!next) break;
const r = await uploadToSignedUrl(next.instr, next.file, fetchImpl);
results.push(r);
done++;
opts?.onProgress?.(done, total);
if (!r.ok) {
// You can choose to throw hard or collect failures
// throw new Error(`Upload failed: ${r.status} for asset ${r.assetId}`);
}
}
});
await Promise.all(workers);
return results;
}
1.6 realtime.ts (SSE wrapper for job progress)
// src/realtime.ts
export type RealtimeEvent =
| { type: "job.progress"; jobId: string; assetId?: string; progress?: number; status?: string; jobType?: string }
| { type: "job.done"; jobId: string; status?: string }
| { type: "cluster.updated"; clusterId: string }
| { type: "asset.updated"; assetId: string }
| { type: "export.updated"; exportId: string }
| { type: "unknown"; raw: any };
export interface RealtimeOptions {
baseUrl: string; // e.g. https://api.kilo.photo/v1
projectId: string;
accessToken?: string;
lastEventId?: string;
onEvent: (ev: RealtimeEvent) => void;
onError?: (err: any) => void;
}
/**
* Browser: use EventSource (cannot set headers), so pass token as query param or cookie session.
* Desktop (Electron): can use fetch streaming with headers if you want.
*/
export function connectSSE(opts: RealtimeOptions): EventSource {
const url = new URL(`${opts.baseUrl.replace(/\/+$/, "")}/realtime`);
url.searchParams.set("projectId", opts.projectId);
if (opts.lastEventId) url.searchParams.set("lastEventId", opts.lastEventId);
if (opts.accessToken) url.searchParams.set("accessToken", opts.accessToken); // alternative: cookie auth
const es = new EventSource(url.toString());
es.addEventListener("job.progress", (e: MessageEvent) => {
opts.onEvent({ type: "job.progress", ...safeParse(e.data) });
});
es.addEventListener("job.done", (e: MessageEvent) => {
opts.onEvent({ type: "job.done", ...safeParse(e.data) });
});
es.addEventListener("cluster.updated", (e: MessageEvent) => {
opts.onEvent({ type: "cluster.updated", ...safeParse(e.data) });
});
es.addEventListener("asset.updated", (e: MessageEvent) => {
opts.onEvent({ type: "asset.updated", ...safeParse(e.data) });
});
es.addEventListener("export.updated", (e: MessageEvent) => {
opts.onEvent({ type: "export.updated", ...safeParse(e.data) });
});
es.onerror = (err) => opts.onError?.(err);
return es;
}
function safeParse(s: string): any {
try { return JSON.parse(s); } catch { return { raw: s }; }
}
Note: For production, prefer cookie-based auth for SSE in browser (since EventSource can’t reliably set headers). Desktop app can use a fetch-stream approach.
1.7 Example usage (frontend / desktop)
import { KiloClient } from "./client";
import { uploadMany } from "./uploads";
const client = new KiloClient({
baseUrl: "https://api.kilo.photo/v1",
getAccessToken: () => localStorage.getItem("kilo_access_token"),
});
async function importShoot(projectId: string, files: File[]) {
// 1) request signed URLs
const prep = await client.prepareUpload(projectId, {
files: files.map((f, i) => ({
clientFileId: String(i),
filename: f.name,
byteSize: f.size,
contentType: f.type || "application/octet-stream",
})),
}, { idempotencyKey: crypto.randomUUID() });
// 2) upload to signed URLs
const uploads = prep.uploads.map(u => ({
instr: u,
file: files[Number(u.clientFileId ?? "0")]!,
}));
const results = await uploadMany(uploads, {
concurrency: 6,
onProgress: (done, total) => console.log(`Uploaded ${done}/${total}`),
});
// 3) finalize upload with checksums (you’d compute sha256 in desktop; browser can use SubtleCrypto)
const finalized = await client.finalizeUpload(projectId, {
assets: results
.filter(r => r.ok)
.map(r => ({
assetId: r.assetId,
checksumSha256: "TODO_SHA256",
})),
}, { idempotencyKey: crypto.randomUUID() });
console.log("Queued jobs:", finalized.queuedJobs);
}
2) Job Processor Spec (the engine that makes it feel instant)
This is the “AI factory line” that turns uploads into moments + winners + searchable archive.
2.1 Job system goals
- At-least-once processing (safe retries)
- Idempotent tasks (repeatable without duplicating outputs)
- Progressive UX (thumbnails first, intelligence later)
- Debounced clustering (don’t thrash the clusterer per file)
- Priority-aware (delivery/export gets priority when needed)
2.2 Queue choice (pragmatic options)
Pick one:
Option A: Postgres-native queue (fast to ship)
- Store jobs in
jobstable - Workers claim jobs via
SELECT ... FOR UPDATE SKIP LOCKED - Pros: simplest infra, consistent transactions
- Cons: heavy throughput may stress Postgres if you go wild
Option B: Redis/BullMQ (great dev velocity)
- Queue in Redis
- Workers pop jobs, report progress to DB
- Pros: robust, common patterns, good rate limiting
- Cons: extra infra
Option C: SQS + DLQ (enterprise-grade)
- Queue in SQS
- Keep job state in DB
- Pros: durable, scalable
- Cons: more moving parts
V1 recommendation: Postgres-native queue (Option A). It’s clean and you already have DB.
2.3 Job state machine (simple and brutal)
Statuses:
queuedrunningdonefailedcanceled
Transitions:
queued→running(claim)running→done(commit outputs)running→failed(max retries reached)running→queued(retry, with delay)- any →
canceled(manual cancel)
2.4 Job payload schema (what’s inside a job)
Store in DB column payload jsonb (add it), or derive from linked tables.
Recommended minimal payload:
{
"traceId": "uuid",
"attempt": 1,
"assetId": "uuid",
"projectId": "uuid",
"type": "compute_embedding",
"inputs": {
"sourceFileKind": "original",
"previewKind": "preview"
}
}
Add:
priority(0..100; lower = higher priority)runAftertimestamp (for scheduled retries / debounce)
2.5 Idempotency rules (the “never duplicate outputs” law)
Every job MUST be safe if executed twice.
How:
- Unique constraints on outputs:
asset_files (asset_id, kind)is uniqueembeddings (asset_id, model)is PKperceptual_hashes (asset_id)is PK
- Upsert outputs:
- Use
INSERT ... ON CONFLICT ... DO UPDATE
- Use
- Write outputs in a transaction:
- Create/update the output rows
- Mark job done
- Emit events (or record audit log)
Bonus: Deduplicate jobs
For “same work” jobs, add jobs.dedupe_key:
generate_thumbnail:{assetId}compute_embedding:{assetId}:{model}cluster_project:{projectId}
Unique index on dedupe_key where status in (queued,running) can prevent duplicates.
2.6 Retry strategy (per job type)
General
- Exponential backoff with jitter
- Don’t retry “bad input” errors (corrupt file, unsupported format)
Suggested defaults:
| Job Type | Max Attempts | Backoff | Notes |
|---|---|---|---|
| extract_exif | 3 | 2s → 10s → 60s | usually cheap |
| generate_thumbnail | 3 | 2s → 10s → 60s | must be fast |
| generate_preview | 4 | 5s → 20s → 2m → 10m | heavier |
| compute_phash | 2 | 10s → 2m | |
| compute_embedding | 3 | 30s → 5m → 30m | GPU or heavy CPU |
| cluster_project | 2 | 1m → 10m | debounce recommended |
| rank_cluster_candidates | 2 | 30s → 5m | |
| detect_faces (opt-in) | 2 | 1m → 10m | privacy gating |
| export_project | 3 | 1m → 10m → 30m | user-facing priority |
If a job fails:
- set
jobs.error - set asset/project “processing issues” badge
- allow manual retry (add endpoint later:
POST /jobs/{id}/retry)
3) Pipeline Chaining (how uploads become “Moments + Winners”)
This is the chain that makes KILO feel like it’s reading your mind.
3.1 Per-asset pipeline (runs on finalizeUpload)
When a file is finalized:
extract_exifgenerate_thumbnail(tiny + instant)generate_preview(smart preview, edit/search base)compute_phash(duplicate detection)compute_embedding(semantic search + clustering similarity)compute_signals(sharpness/blur/exposure warnings) (you can fold into preview job)
Outputs:
asset_files.thumbnailasset_files.previewperceptual_hashesembeddingsassets.flags.processingReady = truewhen minimum is ready (thumb+preview+exif)
3.2 Project-level pipeline (debounced)
You don’t want to recluster 2,000 photos 2,000 times. You want a debounced cluster refresh.
Trigger “cluster refresh” when:
- a batch of embeddings finishes
- or N new assets are ready
- or user hits “Rebuild Moments”
Project jobs:
cluster_projectrank_cluster_candidates(or do ranking inline during clustering)- emit
cluster.updatedevents
Debounce rule (recommended):
- Maintain
projects.cluster_dirty = true(add column) - Schedule
cluster_projectfor “now + 30 seconds” - If more assets finish within 30s, do nothing (job already scheduled)
- When cluster job runs, it clears dirty flag if no new ready assets arrived mid-run
This gives you:
- fast initial moments
- stable UI (clusters don’t constantly reshuffle)
- fewer compute spikes
4) Worker Execution Model (Postgres-native queue)
4.1 Claiming jobs safely (SKIP LOCKED)
Pattern:
- worker opens transaction
- selects 1 job row
FOR UPDATE SKIP LOCKED - marks it running + sets
started_at - commit
- do work
- open transaction
- write outputs + mark job done
- commit
Example SQL (conceptual)
-- Claim one job
with next_job as (
select id
from jobs
where status = 'queued'
and (run_after is null or run_after <= now())
order by priority asc, created_at asc
for update skip locked
limit 1
)
update jobs
set status = 'running',
updated_at = now(),
started_at = now()
where id in (select id from next_job)
returning *;
If no row returned → worker sleeps briefly and loops.
4.2 Worker “do work” rules
Rule 1 — Outputs first, then job done
Job completion must be the last commit step.
Rule 2 — Use UPSERT on outputs
So retries don’t duplicate thumbnails/embeddings/etc.
Rule 3 — Report progress
Update jobs.progress (0..1) and emit SSE events:
job.progressjob.done
Rule 4 — Never block UI on heavy jobs
UI gets thumbnails early; culling can start with “basic mode” and upgrades.
4.3 Worker pseudocode (the unstoppable loop)
// worker.ts (pseudocode)
while (true) {
const job = await db.claimNextJob(); // atomic claim
if (!job) { await sleep(250); continue; }
try {
await db.setProgress(job.id, 0.05);
switch (job.type) {
case "extract_exif":
await runExtractExif(job);
break;
case "generate_thumbnail":
await runThumbnail(job);
break;
case "generate_preview":
await runPreview(job);
break;
case "compute_phash":
await runPHash(job);
break;
case "compute_embedding":
await runEmbedding(job);
break;
case "cluster_project":
await runClusterProject(job);
break;
case "rank_cluster_candidates":
await runRankClusters(job);
break;
case "export_project":
await runExport(job);
break;
default:
throw new Error(`Unknown job type: ${job.type}`);
}
await db.completeJob(job.id); // mark done
await events.emit(job.projectId, "job.done", { jobId: job.id, status: "done" });
} catch (err: any) {
const decision = classifyError(err); // retryable vs permanent
if (!decision.retryable || job.attempt >= job.maxAttempts) {
await db.failJob(job.id, String(err));
await events.emit(job.projectId, "job.done", { jobId: job.id, status: "failed" });
} else {
const runAfter = computeBackoff(job.attempt);
await db.retryJob(job.id, String(err), runAfter);
await events.emit(job.projectId, "job.progress", { jobId: job.id, status: "queued" });
}
}
}
5) What each job actually does (outputs + idempotency)
5.1 generate_thumbnail
- Input: original file
- Output:
asset_files(kind=thumbnail)upsert - Update:
assets.flags.thumbnailReady = true
Idempotency: Upsert on (asset_id, kind).
5.2 generate_preview
- Input: original file
- Output:
asset_files(kind=preview)upsert + histogram stats + maybe basic signals - Update:
assets.flags.previewReady = true
Idempotency: Upsert.
5.3 compute_embedding
- Input: preview pixels (preferred) not RAW
- Output:
embeddings(asset_id, model)upsert - Update:
assets.flags.embeddingReady = true
Idempotency: PK (asset_id, model).
5.4 compute_phash
- Input: thumbnail/preview
- Output:
perceptual_hashes(asset_id)upsert - Update:
assets.flags.phashReady = true
5.5 cluster_project
- Input: embeddings for ready assets
- Output:
clusterscluster_assets(rank set)cluster.winnerAssetIdset +whyWinnerchips
- Emit:
cluster.updated
Idempotency:
- Either: wipe & rebuild in a transaction (safe but heavier)
- Or: incremental clustering (harder)
For V1: wipe & rebuild for project when running, but keep “reviewed clusters locked” behavior by mapping reviewed assets to new clusters if possible (optional). If that’s too much, just mark UI that clusters may refine until “processing complete.”
6) The “Debounced Cluster Refresh” Implementation
Add two fields:
projects.cluster_dirty boolean default falseprojects.cluster_scheduled_at timestamptz null
When any asset finishes embedding:
- set
cluster_dirty = true - if
cluster_scheduled_at is null OR cluster_scheduled_at < now():- set
cluster_scheduled_at = now() + interval '30 seconds' - enqueue
cluster_projectjob withrun_after = cluster_scheduled_at
- set
When cluster_project runs:
- snapshot
ready_asset_count - build clusters
- set
cluster_dirty = false - set
cluster_scheduled_at = null - if during run new assets became ready (count changed):
- set dirty true again + schedule another run (debounce continues)
This keeps it smooth even on monster imports.
7) Extra Hardcore Add-ons (optional but extremely worth it)
7.1 A “processing readiness” policy
Define two readiness levels:
- Cull-ready: thumbnail + preview + basic sharpness
- Search-ready: embedding complete
- Duplicate-ready: phash complete
- AI-ready: clustering + ranking complete
Expose this as:
assets.flags.readiness = "cull" | "search" | "ai"
So UI can gracefully upgrade features instead of lying.
7.2 Priority boosting for user actions
If user clicks “Create Gallery” or “Export Now”:
- bump export jobs to higher priority
- optionally pause cluster jobs during export if compute is scarce
If you want to go even harder, the next drop is:
- DB migrations (SQL) for missing fields like
payload,priority,run_after,attempt,max_attempts,dedupe_key - Exact clustering algorithm spec (time windowing + embedding similarity + burst detection)
- Signals spec (sharpness score, blur score, exposure warnings) with thresholds tuned for real-world photography
Say the word and I’ll slam those in too.