1. TypeScript types + client SDK layout (matches the OpenAPI you’ve got)
  2. Direct-to-storage upload helper (signed URL flow)
  3. Job processor spec (queue, retries, idempotency, chaining, debouncing, concurrency)
  4. Worker pseudocode (claim jobs safely, at-least-once without chaos)

Everything below is designed so a small team can ship V1 without drowning.


1) TypeScript Frontend Contract

1.1 Folder layout for the SDK

kilo-sdk/
  src/
    index.ts
    client.ts
    types.ts
    errors.ts
    pagination.ts
    realtime.ts
    uploads.ts
  package.json
  tsconfig.json
  README.md
  • types.ts = request/response types
  • client.ts = fetch wrapper + endpoints
  • uploads.ts = signed upload helper (PUT with headers)
  • realtime.ts = SSE wrapper
  • errors.ts = typed API errors
  • pagination.ts = cursor helpers

1.2 types.ts (core types)

// src/types.ts

export type UUID = string;

export type ISODateTime = string; // "2026-01-14T12:34:56Z"
export type ISODate = string;     // "2026-01-14"

export type Cursor = string;

export interface ApiError {
  code: string;
  message: string;
  details?: Record<string, unknown>;
  requestId?: string;
}

export interface PageInfo {
  nextCursor?: Cursor | null;
}

export interface User {
  id: UUID;
  email: string;
  displayName: string;
  createdAt: ISODateTime;
}

export type ProjectStatus = "active" | "archived" | "deleted";

export interface Project {
  id: UUID;
  studioId?: UUID | null;
  ownerUserId: UUID;
  title: string;
  description?: string | null;
  shootDate?: ISODate | null;
  timezone?: string | null;
  status: ProjectStatus;
  stats?: Record<string, unknown>;
  createdAt: ISODateTime;
  updatedAt: ISODateTime;
}

export interface Paged<T> {
  items: T[];
  pageInfo: PageInfo;
}

export interface CreateProjectRequest {
  title: string;
  description?: string;
  shootDate?: ISODate;
  timezone?: string;
}

export interface UpdateProjectRequest {
  title?: string;
  description?: string | null;
  shootDate?: ISODate | null;
  timezone?: string | null;
  status?: ProjectStatus;
}

export interface LoginStartRequest {
  email: string;
  locale?: string;
}

export interface LoginStartResponse {
  challengeId: string;
  delivery: "email_code" | "magic_link";
}

export interface LoginVerifyRequest {
  challengeId: string;
  code: string;
}

export interface LoginVerifyResponse {
  accessToken: string;
  refreshToken: string;
  user: User;
}

// Assets
export interface Rating {
  assetId: UUID;
  userId: UUID;
  rating?: number | null;   // 1-5
  picked?: boolean | null;
  rejected?: boolean | null;
  starred?: boolean | null;
  notes?: string | null;
  updatedAt?: ISODateTime | null;
}

export interface Asset {
  id: UUID;
  projectId: UUID;
  capturedAt?: ISODateTime | null;
  ingestedAt: ISODateTime;

  widthPx?: number | null;
  heightPx?: number | null;

  cameraMake?: string | null;
  cameraModel?: string | null;
  lensModel?: string | null;

  focalLengthMm?: number | null;
  shutterSpeed?: string | null;
  aperture?: number | null;
  iso?: number | null;

  exif?: Record<string, unknown>;
  iptc?: Record<string, unknown>;

  flags?: Record<string, unknown>;
  myRating?: Rating;

  bestPreviewUrl?: string | null;
}

export interface UpdateAssetRequest {
  iptc?: Record<string, unknown>;
  keywords?: string[];
  notes?: string | null;
  flags?: Record<string, unknown>;
}

export type AssetFileKind = "original" | "thumbnail" | "preview" | "export";

export interface AssetFile {
  kind: AssetFileKind;
  url: string;
  contentType?: string | null;
  byteSize?: number | null;
  checksumSha256?: string | null;
  expiresAt?: ISODateTime | null;
}

export interface AssetFilesResponse {
  files: AssetFile[];
}

export interface UpsertRatingRequest {
  rating?: number | null; // 1-5
  picked?: boolean | null;
  rejected?: boolean | null;
  starred?: boolean | null;
  notes?: string | null;
}

// Upload
export interface UploadFileDescriptor {
  clientFileId?: string | null;
  filename: string;
  byteSize: number;
  contentType?: string | null;
  capturedAt?: ISODateTime | null;
}

export interface PrepareUploadRequest {
  files: UploadFileDescriptor[];
}

export interface UploadInstruction {
  clientFileId?: string | null;
  assetId: UUID;
  uploadUrl: string;
  headers?: Record<string, string>;
  expiresAt?: ISODateTime | null;
}

export interface PrepareUploadResponse {
  uploads: UploadInstruction[];
}

export interface FinalizeUploadRequest {
  assets: Array<{
    assetId: UUID;
    checksumSha256: string;
    contentType?: string | null;
  }>;
}

export interface FinalizeUploadResponse {
  queuedJobs: UUID[];
}

// Clusters
export type ClusterKind = "moment" | "burst" | "duplicate_group";

export interface ClusterAsset {
  assetId: UUID;
  rank: number; // 1..N
  role?: "candidate" | "winner" | "alt" | null;
  signals?: Record<string, unknown>;
}

export interface Cluster {
  id: UUID;
  projectId: UUID;
  kind: ClusterKind;
  title?: string | null;
  startTime?: ISODateTime | null;
  endTime?: ISODateTime | null;
  score?: number | null;
  reviewed?: boolean | null;
  winnerAssetId?: UUID | null;
  whyWinner?: string[];
  assets: ClusterAsset[];
}

export interface UpdateClusterRequest {
  title?: string | null;
  reviewed?: boolean | null;
  manualOverrides?: Record<string, unknown>;
}

export interface SetWinnerRequest {
  winnerAssetId: UUID;
}

// Bulk cull
export interface BulkCullActionRequest {
  actions: Array<{
    assetId: UUID;
    picked?: boolean | null;
    rejected?: boolean | null;
    rating?: number | null;  // 1-5
    starred?: boolean | null;
  }>;
}

export interface BulkCullActionResponse {
  ok: boolean;
  updatedAssetIds?: UUID[];
}

// Edits
export interface EditVersion {
  id: UUID;
  assetId: UUID;
  userId: UUID;
  parentId?: UUID | null;
  name?: string | null;
  params: Record<string, unknown>;
  createdAt: ISODateTime;
}

export interface EditVersionList {
  items: EditVersion[];
}

export interface CreateEditVersionRequest {
  name?: string | null;
  parentId?: UUID | null;
  params: Record<string, unknown>;
}

export interface ApplyEditBatchRequest {
  assetIds: UUID[];
  mode?: "create_versions" | "overwrite_latest";
  name?: string | null;
}

export interface ApplyEditBatchResponse {
  jobId: UUID;
  status: "queued" | "running" | "done";
}

// Search
export interface SearchResponse {
  results: Array<{
    assetId: UUID;
    score: number;
    highlights?: string[];
  }>;
  tookMs?: number;
}

export interface AdvancedSearchRequest {
  q: string;
  filters?: Record<string, unknown>;
}

// Galleries
export interface Gallery {
  id: UUID;
  projectId: UUID;
  title: string;
  shareSlug: string;
  expiresAt?: ISODateTime | null;
  watermark: boolean;
  allowDownloads: boolean;
  requiresPassword: boolean;
  assetCount: number;
  createdAt: ISODateTime;
}

export interface CreateGalleryRequest {
  title: string;
  assetIds: UUID[];
  password?: string | null;
  expiresAt?: ISODateTime | null;
  watermark?: boolean;
  allowDownloads?: boolean;
}

export interface UpdateGalleryRequest {
  title?: string;
  password?: string | null;
  expiresAt?: ISODateTime | null;
  watermark?: boolean;
  allowDownloads?: boolean;
}

export interface AddGalleryAssetsRequest {
  assetIds: UUID[];
}
export interface RemoveGalleryAssetsRequest {
  assetIds: UUID[];
}

export interface PublicGalleryResponse {
  title: string;
  requiresAuth: boolean;
  token?: string | null;
  assets?: Array<{
    assetId: UUID;
    previewUrl: string;
    favoriteCount?: number | null;
    commentsCount?: number | null;
  }>;
}

export interface PublicGalleryAuthRequest {
  password: string;
}

export interface PublicGalleryAuthResponse {
  token: string;
}

export interface PublicFavoriteRequest {
  assetId: UUID;
  favorite: boolean;
}

export interface PublicFavoriteResponse {
  ok: boolean;
}

export interface PublicCommentRequest {
  assetId: UUID;
  text: string;
}

export interface PublicCommentResponse {
  commentId: UUID;
}

// Exports
export type ExportStatus = "queued" | "running" | "done" | "failed";
export type ExportPreset = "full_res" | "web" | "instagram_carousel" | "story_9x16" | "contact_sheet_pdf";

export interface Export {
  id: UUID;
  projectId: UUID;
  preset: ExportPreset;
  settings?: Record<string, unknown>;
  status: ExportStatus;
  progress?: number | null;
  createdAt: ISODateTime;
}

export interface CreateExportRequest {
  preset: ExportPreset;
  assetIds?: UUID[] | null;
  settings?: Record<string, unknown>;
}

export interface ExportDownloadResponse {
  url: string;
  expiresAt?: ISODateTime | null;
}

// Jobs
export type JobStatus = "queued" | "running" | "done" | "failed" | "canceled";

export interface Job {
  id: UUID;
  projectId?: UUID | null;
  assetId?: UUID | null;
  type: string;
  status: JobStatus;
  progress?: number | null;
  error?: string | null;
  createdAt: ISODateTime;
  updatedAt: ISODateTime;
}

1.3 errors.ts (clean error handling)

// src/errors.ts
import type { ApiError } from "./types";

export class KiloApiError extends Error {
  public readonly status: number;
  public readonly body?: ApiError;

  constructor(status: number, body?: ApiError) {
    super(body?.message ?? `API Error (${status})`);
    this.name = "KiloApiError";
    this.status = status;
    this.body = body;
  }
}

export function isKiloApiError(e: unknown): e is KiloApiError {
  return e instanceof KiloApiError;
}

1.4 client.ts (typed API client)

// src/client.ts
import { KiloApiError } from "./errors";
import type {
  LoginStartRequest, LoginStartResponse,
  LoginVerifyRequest, LoginVerifyResponse,
  CreateProjectRequest, UpdateProjectRequest,
  Project, Asset, AssetFilesResponse, Rating,
  PrepareUploadRequest, PrepareUploadResponse,
  FinalizeUploadRequest, FinalizeUploadResponse,
  Cluster, UpdateClusterRequest,
  SetWinnerRequest,
  BulkCullActionRequest, BulkCullActionResponse,
  EditVersionList, CreateEditVersionRequest, EditVersion,
  ApplyEditBatchRequest, ApplyEditBatchResponse,
  SearchResponse, AdvancedSearchRequest,
  CreateGalleryRequest, Gallery, UpdateGalleryRequest,
  AddGalleryAssetsRequest, RemoveGalleryAssetsRequest,
  PublicGalleryResponse, PublicGalleryAuthRequest, PublicGalleryAuthResponse,
  PublicFavoriteRequest, PublicFavoriteResponse,
  PublicCommentRequest, PublicCommentResponse,
  CreateExportRequest, Export, ExportDownloadResponse,
  Job, Paged, UUID
} from "./types";

export interface KiloClientConfig {
  baseUrl: string; // e.g. "https://api.kilo.photo/v1"
  getAccessToken?: () => string | null;
  defaultHeaders?: Record<string, string>;
  fetchImpl?: typeof fetch;
}

type RequestOpts = {
  idempotencyKey?: string;
  headers?: Record<string, string>;
  signal?: AbortSignal;
};

export class KiloClient {
  private baseUrl: string;
  private getAccessToken?: () => string | null;
  private defaultHeaders: Record<string, string>;
  private fetchImpl: typeof fetch;

  constructor(cfg: KiloClientConfig) {
    this.baseUrl = cfg.baseUrl.replace(/\/+$/, "");
    this.getAccessToken = cfg.getAccessToken;
    this.defaultHeaders = cfg.defaultHeaders ?? {};
    this.fetchImpl = cfg.fetchImpl ?? fetch;
  }

  private async request<T>(method: string, path: string, body?: unknown, opts?: RequestOpts): Promise<T> {
    const url = `${this.baseUrl}${path}`;
    const headers: Record<string, string> = {
      "Content-Type": "application/json",
      ...this.defaultHeaders,
      ...(opts?.headers ?? {})
    };

    const token = this.getAccessToken?.();
    if (token) headers["Authorization"] = `Bearer ${token}`;
    if (opts?.idempotencyKey) headers["Idempotency-Key"] = opts.idempotencyKey;

    const res = await this.fetchImpl(url, {
      method,
      headers,
      body: body === undefined ? undefined : JSON.stringify(body),
      signal: opts?.signal,
    });

    const text = await res.text();
    const maybeJson = text ? safeJson(text) : undefined;

    if (!res.ok) {
      throw new KiloApiError(res.status, maybeJson);
    }
    return maybeJson as T;
  }

  // AUTH
  loginStart(req: LoginStartRequest): Promise<LoginStartResponse> {
    return this.request("POST", "/auth/login", req);
  }

  loginVerify(req: LoginVerifyRequest): Promise<LoginVerifyResponse> {
    return this.request("POST", "/auth/verify", req);
  }

  // PROJECTS
  listProjects(params?: { cursor?: string; limit?: number }): Promise<Paged<Project>> {
    const q = qs(params);
    return this.request("GET", `/projects${q}`);
  }

  createProject(req: CreateProjectRequest): Promise<Project> {
    return this.request("POST", "/projects", req);
  }

  getProject(projectId: UUID): Promise<Project> {
    return this.request("GET", `/projects/${projectId}`);
  }

  updateProject(projectId: UUID, req: UpdateProjectRequest): Promise<Project> {
    return this.request("PATCH", `/projects/${projectId}`, req);
  }

  archiveProject(projectId: UUID): Promise<Project> {
    return this.request("POST", `/projects/${projectId}/archive`);
  }

  // INGEST
  prepareUpload(projectId: UUID, req: PrepareUploadRequest, opts?: RequestOpts): Promise<PrepareUploadResponse> {
    return this.request("POST", `/projects/${projectId}/assets:prepareUpload`, req, opts);
  }

  finalizeUpload(projectId: UUID, req: FinalizeUploadRequest, opts?: RequestOpts): Promise<FinalizeUploadResponse> {
    return this.request("POST", `/projects/${projectId}/assets:finalizeUpload`, req, opts);
  }

  // ASSETS
  listAssets(projectId: UUID, params?: { cursor?: string; limit?: number; picked?: boolean; rejected?: boolean; ratingMin?: number }): Promise<Paged<Asset>> {
    const q = qs(params);
    return this.request("GET", `/projects/${projectId}/assets${q}`);
  }

  getAsset(assetId: UUID): Promise<Asset> {
    return this.request("GET", `/assets/${assetId}`);
  }

  updateAsset(assetId: UUID, req: Record<string, unknown>): Promise<Asset> {
    return this.request("PATCH", `/assets/${assetId}`, req);
  }

  getAssetFiles(assetId: UUID): Promise<AssetFilesResponse> {
    return this.request("GET", `/assets/${assetId}/files`);
  }

  setRating(assetId: UUID, req: Record<string, unknown>): Promise<Rating> {
    return this.request("POST", `/assets/${assetId}/ratings`, req);
  }

  // CLUSTERS
  listClusters(projectId: UUID, params?: { kind?: string; cursor?: string; limit?: number }): Promise<Paged<Cluster>> {
    const q = qs(params);
    return this.request("GET", `/projects/${projectId}/clusters${q}`);
  }

  getCluster(clusterId: UUID): Promise<Cluster> {
    return this.request("GET", `/clusters/${clusterId}`);
  }

  updateCluster(clusterId: UUID, req: UpdateClusterRequest): Promise<Cluster> {
    return this.request("PATCH", `/clusters/${clusterId}`, req);
  }

  setWinner(clusterId: UUID, req: SetWinnerRequest): Promise<Cluster> {
    return this.request("POST", `/clusters/${clusterId}/winner`, req);
  }

  // CULLING (bulk)
  applyCullActions(projectId: UUID, req: BulkCullActionRequest, opts?: RequestOpts): Promise<BulkCullActionResponse> {
    return this.request("POST", `/projects/${projectId}/cull:applyAction`, req, opts);
  }

  // EDITS
  listEdits(assetId: UUID): Promise<EditVersionList> {
    return this.request("GET", `/assets/${assetId}/edits`);
  }

  createEdit(assetId: UUID, req: CreateEditVersionRequest): Promise<EditVersion> {
    return this.request("POST", `/assets/${assetId}/edits`, req);
  }

  getEdit(editId: UUID): Promise<EditVersion> {
    return this.request("GET", `/edits/${editId}`);
  }

  applyEditBatch(editId: UUID, req: ApplyEditBatchRequest, opts?: RequestOpts): Promise<ApplyEditBatchResponse> {
    return this.request("POST", `/edits/${editId}/applyTo`, req, opts);
  }

  // SEARCH
  search(projectId: UUID, qText: string, limit?: number): Promise<SearchResponse> {
    const q = qs({ q: qText, limit });
    return this.request("GET", `/projects/${projectId}/search${q}`);
  }

  advancedSearch(projectId: UUID, req: AdvancedSearchRequest): Promise<SearchResponse> {
    return this.request("POST", `/projects/${projectId}/search`, req);
  }

  // GALLERIES (owner/admin)
  createGallery(projectId: UUID, req: CreateGalleryRequest, opts?: RequestOpts): Promise<Gallery> {
    return this.request("POST", `/projects/${projectId}/galleries`, req, opts);
  }

  getGallery(galleryId: UUID): Promise<Gallery> {
    return this.request("GET", `/galleries/${galleryId}`);
  }

  updateGallery(galleryId: UUID, req: UpdateGalleryRequest): Promise<Gallery> {
    return this.request("PATCH", `/galleries/${galleryId}`, req);
  }

  addGalleryAssets(galleryId: UUID, req: AddGalleryAssetsRequest, opts?: RequestOpts): Promise<Gallery> {
    return this.request("POST", `/galleries/${galleryId}/assets`, req, opts);
  }

  removeGalleryAssets(galleryId: UUID, req: RemoveGalleryAssetsRequest): Promise<Gallery> {
    return this.request("DELETE", `/galleries/${galleryId}/assets`, req);
  }

  // PUBLIC SHARE (client)
  publicGetGallery(shareSlug: string): Promise<PublicGalleryResponse> {
    return this.request("GET", `/share/${encodeURIComponent(shareSlug)}`);
  }

  publicAuthGallery(shareSlug: string, req: PublicGalleryAuthRequest): Promise<PublicGalleryAuthResponse> {
    return this.request("POST", `/share/${encodeURIComponent(shareSlug)}/auth`, req);
  }

  publicFavorite(shareSlug: string, req: PublicFavoriteRequest): Promise<PublicFavoriteResponse> {
    return this.request("POST", `/share/${encodeURIComponent(shareSlug)}/favorite`, req);
  }

  publicComment(shareSlug: string, req: PublicCommentRequest): Promise<PublicCommentResponse> {
    return this.request("POST", `/share/${encodeURIComponent(shareSlug)}/comment`, req);
  }

  // EXPORTS
  createExport(projectId: UUID, req: CreateExportRequest, opts?: RequestOpts): Promise<Export> {
    return this.request("POST", `/projects/${projectId}/exports`, req, opts);
  }

  getExport(exportId: UUID): Promise<Export> {
    return this.request("GET", `/exports/${exportId}`);
  }

  exportDownload(exportId: UUID): Promise<ExportDownloadResponse> {
    return this.request("GET", `/exports/${exportId}/download`);
  }

  // JOBS
  listJobs(projectId: UUID, params?: { status?: string; type?: string; cursor?: string; limit?: number }): Promise<Paged<Job>> {
    const q = qs(params);
    return this.request("GET", `/projects/${projectId}/jobs${q}`);
  }
}

// helpers
function safeJson(text: string): any {
  try { return JSON.parse(text); } catch { return undefined; }
}

function qs(params?: Record<string, any>): string {
  if (!params) return "";
  const clean: Record<string, string> = {};
  for (const [k, v] of Object.entries(params)) {
    if (v === undefined || v === null) continue;
    clean[k] = String(v);
  }
  const s = new URLSearchParams(clean).toString();
  return s ? `?${s}` : "";
}

1.5 uploads.ts (signed URL direct upload helper)

// src/uploads.ts
import type { UploadInstruction, UUID } from "./types";

export interface UploadResult {
  assetId: UUID;
  ok: boolean;
  status: number;
}

export async function uploadToSignedUrl(
  instr: UploadInstruction,
  file: Blob,
  fetchImpl: typeof fetch = fetch
): Promise<UploadResult> {
  const res = await fetchImpl(instr.uploadUrl, {
    method: "PUT",
    headers: {
      ...(instr.headers ?? {}),
      // Some S3-style signed URLs require exact headers, so don't add extra unless needed.
    },
    body: file,
  });

  return { assetId: instr.assetId, ok: res.ok, status: res.status };
}

// Optional: concurrency-limited bulk uploader
export async function uploadMany(
  items: Array<{ instr: UploadInstruction; file: Blob }>,
  opts?: { concurrency?: number; fetchImpl?: typeof fetch; onProgress?: (done: number, total: number) => void }
): Promise<UploadResult[]> {
  const concurrency = Math.max(1, opts?.concurrency ?? 4);
  const fetchImpl = opts?.fetchImpl ?? fetch;
  const total = items.length;

  let done = 0;
  const results: UploadResult[] = [];

  const queue = items.slice();
  const workers = Array.from({ length: concurrency }, async () => {
    while (queue.length) {
      const next = queue.shift();
      if (!next) break;
      const r = await uploadToSignedUrl(next.instr, next.file, fetchImpl);
      results.push(r);
      done++;
      opts?.onProgress?.(done, total);
      if (!r.ok) {
        // You can choose to throw hard or collect failures
        // throw new Error(`Upload failed: ${r.status} for asset ${r.assetId}`);
      }
    }
  });

  await Promise.all(workers);
  return results;
}

1.6 realtime.ts (SSE wrapper for job progress)

// src/realtime.ts

export type RealtimeEvent =
  | { type: "job.progress"; jobId: string; assetId?: string; progress?: number; status?: string; jobType?: string }
  | { type: "job.done"; jobId: string; status?: string }
  | { type: "cluster.updated"; clusterId: string }
  | { type: "asset.updated"; assetId: string }
  | { type: "export.updated"; exportId: string }
  | { type: "unknown"; raw: any };

export interface RealtimeOptions {
  baseUrl: string; // e.g. https://api.kilo.photo/v1
  projectId: string;
  accessToken?: string;
  lastEventId?: string;
  onEvent: (ev: RealtimeEvent) => void;
  onError?: (err: any) => void;
}

/**
 * Browser: use EventSource (cannot set headers), so pass token as query param or cookie session.
 * Desktop (Electron): can use fetch streaming with headers if you want.
 */
export function connectSSE(opts: RealtimeOptions): EventSource {
  const url = new URL(`${opts.baseUrl.replace(/\/+$/, "")}/realtime`);
  url.searchParams.set("projectId", opts.projectId);
  if (opts.lastEventId) url.searchParams.set("lastEventId", opts.lastEventId);
  if (opts.accessToken) url.searchParams.set("accessToken", opts.accessToken); // alternative: cookie auth

  const es = new EventSource(url.toString());

  es.addEventListener("job.progress", (e: MessageEvent) => {
    opts.onEvent({ type: "job.progress", ...safeParse(e.data) });
  });
  es.addEventListener("job.done", (e: MessageEvent) => {
    opts.onEvent({ type: "job.done", ...safeParse(e.data) });
  });
  es.addEventListener("cluster.updated", (e: MessageEvent) => {
    opts.onEvent({ type: "cluster.updated", ...safeParse(e.data) });
  });
  es.addEventListener("asset.updated", (e: MessageEvent) => {
    opts.onEvent({ type: "asset.updated", ...safeParse(e.data) });
  });
  es.addEventListener("export.updated", (e: MessageEvent) => {
    opts.onEvent({ type: "export.updated", ...safeParse(e.data) });
  });

  es.onerror = (err) => opts.onError?.(err);

  return es;
}

function safeParse(s: string): any {
  try { return JSON.parse(s); } catch { return { raw: s }; }
}

Note: For production, prefer cookie-based auth for SSE in browser (since EventSource can’t reliably set headers). Desktop app can use a fetch-stream approach.


1.7 Example usage (frontend / desktop)

import { KiloClient } from "./client";
import { uploadMany } from "./uploads";

const client = new KiloClient({
  baseUrl: "https://api.kilo.photo/v1",
  getAccessToken: () => localStorage.getItem("kilo_access_token"),
});

async function importShoot(projectId: string, files: File[]) {
  // 1) request signed URLs
  const prep = await client.prepareUpload(projectId, {
    files: files.map((f, i) => ({
      clientFileId: String(i),
      filename: f.name,
      byteSize: f.size,
      contentType: f.type || "application/octet-stream",
    })),
  }, { idempotencyKey: crypto.randomUUID() });

  // 2) upload to signed URLs
  const uploads = prep.uploads.map(u => ({
    instr: u,
    file: files[Number(u.clientFileId ?? "0")]!,
  }));

  const results = await uploadMany(uploads, {
    concurrency: 6,
    onProgress: (done, total) => console.log(`Uploaded ${done}/${total}`),
  });

  // 3) finalize upload with checksums (you’d compute sha256 in desktop; browser can use SubtleCrypto)
  const finalized = await client.finalizeUpload(projectId, {
    assets: results
      .filter(r => r.ok)
      .map(r => ({
        assetId: r.assetId,
        checksumSha256: "TODO_SHA256",
      })),
  }, { idempotencyKey: crypto.randomUUID() });

  console.log("Queued jobs:", finalized.queuedJobs);
}

2) Job Processor Spec (the engine that makes it feel instant)

This is the “AI factory line” that turns uploads into moments + winners + searchable archive.

2.1 Job system goals

  • At-least-once processing (safe retries)
  • Idempotent tasks (repeatable without duplicating outputs)
  • Progressive UX (thumbnails first, intelligence later)
  • Debounced clustering (don’t thrash the clusterer per file)
  • Priority-aware (delivery/export gets priority when needed)

2.2 Queue choice (pragmatic options)

Pick one:

Option A: Postgres-native queue (fast to ship)

  • Store jobs in jobs table
  • Workers claim jobs via SELECT ... FOR UPDATE SKIP LOCKED
  • Pros: simplest infra, consistent transactions
  • Cons: heavy throughput may stress Postgres if you go wild

Option B: Redis/BullMQ (great dev velocity)

  • Queue in Redis
  • Workers pop jobs, report progress to DB
  • Pros: robust, common patterns, good rate limiting
  • Cons: extra infra

Option C: SQS + DLQ (enterprise-grade)

  • Queue in SQS
  • Keep job state in DB
  • Pros: durable, scalable
  • Cons: more moving parts

V1 recommendation: Postgres-native queue (Option A). It’s clean and you already have DB.


2.3 Job state machine (simple and brutal)

Statuses:

  • queued
  • running
  • done
  • failed
  • canceled

Transitions:

  • queued → running (claim)
  • running → done (commit outputs)
  • running → failed (max retries reached)
  • running → queued (retry, with delay)
  • any → canceled (manual cancel)

2.4 Job payload schema (what’s inside a job)

Store in DB column payload jsonb (add it), or derive from linked tables.

Recommended minimal payload:

{
  "traceId": "uuid",
  "attempt": 1,
  "assetId": "uuid",
  "projectId": "uuid",
  "type": "compute_embedding",
  "inputs": {
    "sourceFileKind": "original",
    "previewKind": "preview"
  }
}

Add:

  • priority (0..100; lower = higher priority)
  • runAfter timestamp (for scheduled retries / debounce)

2.5 Idempotency rules (the “never duplicate outputs” law)

Every job MUST be safe if executed twice.

How:

  • Unique constraints on outputs:
    • asset_files (asset_id, kind) is unique
    • embeddings (asset_id, model) is PK
    • perceptual_hashes (asset_id) is PK
  • Upsert outputs:
    • Use INSERT ... ON CONFLICT ... DO UPDATE
  • Write outputs in a transaction:
    • Create/update the output rows
    • Mark job done
    • Emit events (or record audit log)

Bonus: Deduplicate jobs

For “same work” jobs, add jobs.dedupe_key:

  • generate_thumbnail:{assetId}
  • compute_embedding:{assetId}:{model}
  • cluster_project:{projectId}

Unique index on dedupe_key where status in (queued,running) can prevent duplicates.


2.6 Retry strategy (per job type)

General

  • Exponential backoff with jitter
  • Don’t retry “bad input” errors (corrupt file, unsupported format)

Suggested defaults:

Job TypeMax AttemptsBackoffNotes
extract_exif32s → 10s → 60susually cheap
generate_thumbnail32s → 10s → 60smust be fast
generate_preview45s → 20s → 2m → 10mheavier
compute_phash210s → 2m
compute_embedding330s → 5m → 30mGPU or heavy CPU
cluster_project21m → 10mdebounce recommended
rank_cluster_candidates230s → 5m
detect_faces (opt-in)21m → 10mprivacy gating
export_project31m → 10m → 30muser-facing priority

If a job fails:

  • set jobs.error
  • set asset/project “processing issues” badge
  • allow manual retry (add endpoint later: POST /jobs/{id}/retry)

3) Pipeline Chaining (how uploads become “Moments + Winners”)

This is the chain that makes KILO feel like it’s reading your mind.

3.1 Per-asset pipeline (runs on finalizeUpload)

When a file is finalized:

  1. extract_exif
  2. generate_thumbnail (tiny + instant)
  3. generate_preview (smart preview, edit/search base)
  4. compute_phash (duplicate detection)
  5. compute_embedding (semantic search + clustering similarity)
  6. compute_signals (sharpness/blur/exposure warnings) (you can fold into preview job)

Outputs:

  • asset_files.thumbnail
  • asset_files.preview
  • perceptual_hashes
  • embeddings
  • assets.flags.processingReady = true when minimum is ready (thumb+preview+exif)

3.2 Project-level pipeline (debounced)

You don’t want to recluster 2,000 photos 2,000 times. You want a debounced cluster refresh.

Trigger “cluster refresh” when:

  • a batch of embeddings finishes
  • or N new assets are ready
  • or user hits “Rebuild Moments”

Project jobs:

  1. cluster_project
  2. rank_cluster_candidates (or do ranking inline during clustering)
  3. emit cluster.updated events

Debounce rule (recommended):

  • Maintain projects.cluster_dirty = true (add column)
  • Schedule cluster_project for “now + 30 seconds”
  • If more assets finish within 30s, do nothing (job already scheduled)
  • When cluster job runs, it clears dirty flag if no new ready assets arrived mid-run

This gives you:

  • fast initial moments
  • stable UI (clusters don’t constantly reshuffle)
  • fewer compute spikes

4) Worker Execution Model (Postgres-native queue)

4.1 Claiming jobs safely (SKIP LOCKED)

Pattern:

  • worker opens transaction
  • selects 1 job row FOR UPDATE SKIP LOCKED
  • marks it running + sets started_at
  • commit
  • do work
  • open transaction
  • write outputs + mark job done
  • commit

Example SQL (conceptual)

-- Claim one job
with next_job as (
  select id
  from jobs
  where status = 'queued'
    and (run_after is null or run_after <= now())
  order by priority asc, created_at asc
  for update skip locked
  limit 1
)
update jobs
set status = 'running',
    updated_at = now(),
    started_at = now()
where id in (select id from next_job)
returning *;

If no row returned → worker sleeps briefly and loops.


4.2 Worker “do work” rules

Rule 1 — Outputs first, then job done

Job completion must be the last commit step.

Rule 2 — Use UPSERT on outputs

So retries don’t duplicate thumbnails/embeddings/etc.

Rule 3 — Report progress

Update jobs.progress (0..1) and emit SSE events:

  • job.progress
  • job.done

Rule 4 — Never block UI on heavy jobs

UI gets thumbnails early; culling can start with “basic mode” and upgrades.


4.3 Worker pseudocode (the unstoppable loop)

// worker.ts (pseudocode)

while (true) {
  const job = await db.claimNextJob(); // atomic claim
  if (!job) { await sleep(250); continue; }

  try {
    await db.setProgress(job.id, 0.05);

    switch (job.type) {
      case "extract_exif":
        await runExtractExif(job);
        break;
      case "generate_thumbnail":
        await runThumbnail(job);
        break;
      case "generate_preview":
        await runPreview(job);
        break;
      case "compute_phash":
        await runPHash(job);
        break;
      case "compute_embedding":
        await runEmbedding(job);
        break;
      case "cluster_project":
        await runClusterProject(job);
        break;
      case "rank_cluster_candidates":
        await runRankClusters(job);
        break;
      case "export_project":
        await runExport(job);
        break;
      default:
        throw new Error(`Unknown job type: ${job.type}`);
    }

    await db.completeJob(job.id); // mark done
    await events.emit(job.projectId, "job.done", { jobId: job.id, status: "done" });

  } catch (err: any) {
    const decision = classifyError(err); // retryable vs permanent

    if (!decision.retryable || job.attempt >= job.maxAttempts) {
      await db.failJob(job.id, String(err));
      await events.emit(job.projectId, "job.done", { jobId: job.id, status: "failed" });
    } else {
      const runAfter = computeBackoff(job.attempt);
      await db.retryJob(job.id, String(err), runAfter);
      await events.emit(job.projectId, "job.progress", { jobId: job.id, status: "queued" });
    }
  }
}

5) What each job actually does (outputs + idempotency)

5.1 generate_thumbnail

  • Input: original file
  • Output: asset_files(kind=thumbnail) upsert
  • Update: assets.flags.thumbnailReady = true

Idempotency: Upsert on (asset_id, kind).

5.2 generate_preview

  • Input: original file
  • Output: asset_files(kind=preview) upsert + histogram stats + maybe basic signals
  • Update: assets.flags.previewReady = true

Idempotency: Upsert.

5.3 compute_embedding

  • Input: preview pixels (preferred) not RAW
  • Output: embeddings(asset_id, model) upsert
  • Update: assets.flags.embeddingReady = true

Idempotency: PK (asset_id, model).

5.4 compute_phash

  • Input: thumbnail/preview
  • Output: perceptual_hashes(asset_id) upsert
  • Update: assets.flags.phashReady = true

5.5 cluster_project

  • Input: embeddings for ready assets
  • Output:
    • clusters
    • cluster_assets (rank set)
    • cluster.winnerAssetId set + whyWinner chips
  • Emit: cluster.updated

Idempotency:

  • Either: wipe & rebuild in a transaction (safe but heavier)
  • Or: incremental clustering (harder)
    For V1: wipe & rebuild for project when running, but keep “reviewed clusters locked” behavior by mapping reviewed assets to new clusters if possible (optional). If that’s too much, just mark UI that clusters may refine until “processing complete.”

6) The “Debounced Cluster Refresh” Implementation

Add two fields:

  • projects.cluster_dirty boolean default false
  • projects.cluster_scheduled_at timestamptz null

When any asset finishes embedding:

  1. set cluster_dirty = true
  2. if cluster_scheduled_at is null OR cluster_scheduled_at < now():
    • set cluster_scheduled_at = now() + interval '30 seconds'
    • enqueue cluster_project job with run_after = cluster_scheduled_at

When cluster_project runs:

  • snapshot ready_asset_count
  • build clusters
  • set cluster_dirty = false
  • set cluster_scheduled_at = null
  • if during run new assets became ready (count changed):
    • set dirty true again + schedule another run (debounce continues)

This keeps it smooth even on monster imports.


7) Extra Hardcore Add-ons (optional but extremely worth it)

7.1 A “processing readiness” policy

Define two readiness levels:

  • Cull-ready: thumbnail + preview + basic sharpness
  • Search-ready: embedding complete
  • Duplicate-ready: phash complete
  • AI-ready: clustering + ranking complete

Expose this as:

  • assets.flags.readiness = "cull" | "search" | "ai"

So UI can gracefully upgrade features instead of lying.

7.2 Priority boosting for user actions

If user clicks “Create Gallery” or “Export Now”:

  • bump export jobs to higher priority
  • optionally pause cluster jobs during export if compute is scarce

If you want to go even harder, the next drop is:

  • DB migrations (SQL) for missing fields like payloadpriorityrun_afterattemptmax_attemptsdedupe_key
  • Exact clustering algorithm spec (time windowing + embedding similarity + burst detection)
  • Signals spec (sharpness score, blur score, exposure warnings) with thresholds tuned for real-world photography

Say the word and I’ll slam those in too.