Backend Engineering Assessment
Seven practical questions covering pagination, multi-tenancy, data integrity, Odoo ORM,
PostgreSQL, FastAPI, and CI/CD.
Find what is wrong with each code snippet and select the correct answer.
Q1 — Cursor-Based Pagination
You are building a REST API endpoint that allows an external sync service to pull a large list of records page by page. The endpoint uses cursor-based pagination instead of offset-based to avoid duplicate or skipped records when data changes between requests. The cursor encodes the position of the last record seen on the previous page. Records are always returned sorted by (updated_at ASC, id ASC).
import base64 from datetime import datetime from typing import Optional def encode_cursor(updated_at: datetime, record_id: int) -> str: dt_str = updated_at.isoformat() if updated_at else "" raw = f"{dt_str}|{record_id}" return base64.urlsafe_b64encode(raw.encode()).decode() def decode_cursor(cursor: str): try: raw = base64.urlsafe_b64decode(cursor.encode()).decode() dt_str, id_str = raw.split("|") # line A updated_at = datetime.fromisoformat(dt_str) if dt_str else None return updated_at, int(id_str) except Exception as exc: raise ValueError("Invalid cursor") from exc def build_cursor_filter(updated_at: Optional[datetime], record_id: int) -> dict: if updated_at: return { "updated_at__gte": updated_at, # line B "id__gt": record_id, # line C } return {"id__gt": record_id} def get_page(db, table, base_filter, cursor, page_size): filters = dict(base_filter) if cursor: cur_date, cur_id = decode_cursor(cursor) filters.update(build_cursor_filter(cur_date, cur_id)) rows = db.query(table).filter(**filters) \ .order_by("updated_at", "id").limit(page_size).all() # line D ^^^^^^^^^^ has_more = len(rows) > page_size # line E rows = rows[:page_size] next_cursor = encode_cursor(rows[-1].updated_at, rows[-1].id) \ if (has_more and rows) else None return {"items": rows, "has_more": has_more, "next_cursor": next_cursor}
+ characters but the raw format is datetime|id — splitting from the right ensures the ID is cleanly extracted even if the datetime portion ever contains a pipe. split("|", 1) splits from the left, which would put extra content in the wrong variable.(updated_at ASC, id ASC). A record with a newer timestamp but a lower ID (e.g. id=42, updated_at=T+1 after cursor id=100, updated_at=T) is silently dropped by the AND filter because 42 ≤ 100. The correct logic is an OR: either the timestamp is strictly greater, or the timestamp equals and the ID is strictly greater..limit(page_size) returns at most page_size rows. Therefore len(rows) > page_size is always False. The caller thinks there are no more pages after the first one. Fix: fetch page_size + 1 rows as a probe, check if the extra row exists, then trim.Q2 — Tenant Data Isolation
You are working on a multi-tenant SaaS application. Each tenant is identified by a unique tenant_id. All data belongs to exactly one tenant and must never be visible to another. Each request is authenticated and the resolved tenant_id is passed down to the data layer. A helper function can_access_tenant(user, tenant_id) checks whether the authenticated user is permitted to work with the given tenant.
from typing import Optional def can_access_tenant(user, tenant_id: int) -> bool: return tenant_id in user.allowed_tenant_ids def build_contact_filter( role: str, contact_id: Optional[int] = None, modified_since: Optional[str] = None, external_id: Optional[str] = None, ) -> dict: filters = { "role": role, # line A } if contact_id: filters["id"] = contact_id if modified_since: filters["updated_at__gte"] = modified_since if external_id: filters["external_id"] = external_id return filters def list_contacts(db, user, tenant_id, role, ...): if not can_access_tenant(user, tenant_id): # line B raise PermissionError("Access denied") filters = build_contact_filter(role=role, ...) rows = db.query("contacts").filter(**filters).limit(page_size + 1).all() has_more = len(rows) > page_size rows = rows[:page_size] return {"items": rows, "has_more": has_more}
contacts table with no tenant_id condition, returning records from all tenants.build_contact_filter is a shared query builder used by list_contacts, get_contact, update_contact, etc. Fixing it there means every caller automatically gets tenant scoping.tenant_id filter is always applied regardless of context, providing defense in depth.Q3 — Atomic Write Operations
You are building an integration layer that receives contact records from an external sync service. When a contact is created, the system must perform two writes in sequence: (1) insert the contact into the local database, (2) write the external metadata (external_id, provider, synced_at). The external_id is the key used to deduplicate future sync calls. The sync service automatically retries any request that returns a 5xx response.
def find_existing_contact(db, tenant_id, external_id): return db.query("contacts").filter( tenant_id=tenant_id, external_id=external_id).first() def create_contact(db, tenant_id, payload): external_id = payload.get("external_id") provider = payload.get("provider") synced_at = payload.get("synced_at") # Deduplicate: reject if this external_id already exists if external_id and find_existing_contact(db, tenant_id, external_id): raise ValueError(f"Contact already exists") # Step 1: insert the core contact record contact = db.insert("contacts", { "tenant_id": tenant_id, "name": payload.get("name"), "email": payload.get("email"), }) # contact.id is now assigned. external_id is NOT yet written. # Step 2: write external metadata db.update("contacts", contact.id, { # line A "external_id": external_id, "provider": provider, "synced_at": synced_at, }) return db.query("contacts").filter(id=contact.id).first()
external_id=NULL. The update failed, so the metadata was never written. On retry, find_existing_contact searches for external_id="CRM-99" and finds nothing. A second row is created, producing a duplicate orphan.create_contact was called.db.delete may fail for the same reason, leaving the orphan. Problem 2: Between insert and delete, the orphan row is visible to concurrent sessions. A savepoint keeps the insert invisible until the full block commits.Q4 — Odoo ORM / Model Inheritance
An Odoo 17 module defines a model to track project budgets. The developer extended the existing project.project model with computed fields and a record rule for company-level isolation. Review the code for correctness.
from odoo import models, fields, api class ProjectBudget(models.Model): _name = 'project.budget' # line A _inherit = 'project.project' # line B budget = fields.Float(string='Budget') spent = fields.Float(string='Spent') remaining = fields.Float( string='Remaining', compute='_compute_remaining', store=True, ) @api.onchange('budget') # line C def _compute_remaining(self): for rec in self: rec.remaining = rec.budget - rec.spent
<!-- Record rule XML --> <record model="ir.rule" id="rule_budget_own_company"> <field name="name">Budget: own company</field> <field name="model_id" ref="model_project_budget"/> <field name="domain_force"> [('company_id','=',user.company_id.id)] </field> <field name="groups" eval="[]"/> <!-- line D --> </record>
_name differs from _inherit, Odoo creates a brand new model with its own table (prototype inheritance). The new model copies all fields from the parent but stores data separately. Existing project.project records do NOT gain the budget fields. To extend in-place, use _inherit = 'project.project' without redefining _name.@api.onchange only triggers during form interactions in the web client. It does NOT trigger on direct ORM write()/create() calls, background jobs, or XML-RPC. A stored computed field must use @api.depends('budget', 'spent') so the ORM recomputes whenever either dependency changes.groups is a global rule. Unlike group-specific rules (additive, bypassed by superuser), global rules are enforced for ALL users and cannot be bypassed even with sudo(). If any project.budget record has company_id = False, it becomes completely invisible to everyone.Q5 — PostgreSQL Query & Migration
A backend team maintains a transactions table with ~8 million rows. They need to optimize a slow query and apply a schema migration. Review both operations.
-- Table definition: CREATE TABLE transactions ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'pending', amount NUMERIC(12,2) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Existing index: CREATE INDEX idx_transactions_tenant ON transactions (tenant_id, created_at); -- Slow query (takes 4.2s): SELECT id, amount, created_at FROM transactions WHERE tenant_id = '...' AND status = 'completed' -- line A AND created_at >= '2025-01-01' ORDER BY created_at DESC LIMIT 50; -- EXPLAIN ANALYZE output (excerpt): -- Sort (cost=28451..28452 rows=50) -- -> Filter (rows=50, rows removed by filter: 184720) -- line B -- -> Index Scan using idx_transactions_tenant -- (rows=184770) -- Migration script: ALTER TABLE transactions ADD COLUMN notes TEXT NOT NULL; -- line C
(tenant_id, created_at) lets PostgreSQL find rows for a given tenant within a date range, but it knows nothing about status. The database fetches all 184,770 matching rows, checks status = 'completed' on each, and discards 99.97%. A composite index (tenant_id, status, created_at) allows seeking directly to the right status.NOT NULL column without a DEFAULT, PostgreSQL fails with ERROR: column contains null values. With a DEFAULT on PG 11+, the value is written to catalog metadata without touching rows — nearly instant and no lock.Q6 — FastAPI + Cloud Function
A Google Cloud Function (Python, HTTP-triggered) wraps a FastAPI app. It calls an external payment API and returns the result. Review the code for correctness and security.
import functions_framework import httpx, os from fastapi import FastAPI, Request from mangum import Mangum app = FastAPI() @app.post("/charge") async def charge(request: Request): body = await request.json() amount = body["amount"] token = body["payment_token"] api_key = os.environ.get("PAYMENT_API_KEY") async with httpx.AsyncClient() as client: resp = await client.post( "https://api.payments.example.com/v1/charges", json={"amount": amount, "source": token}, headers={"Authorization": f"Bearer {api_key}"}, timeout=30, ) if resp.status_code != 200: return { # line B "error": True, "detail": resp.text, "headers": dict(resp.headers), } return {"success": True, "charge_id": resp.json()["id"]} @app.post("/refund") def refund(request: Request): # line C body = request.json() # line C (cont.) # ... process refund ... return {"success": True} @functions_framework.http def main(request): handler = Mangum(app) return handler(request)
{"error": True} — clients checking HTTP status codes think the request succeeded. Should return 502 with a generic error message.Request.json() is an async method that returns a coroutine. Calling it without await returns the coroutine object itself, not the parsed JSON. Any dictionary access on this coroutine raises TypeError. Fix: async def refund() and body = await request.json().Q7 — CI/CD: GitHub Actions + Cloud Run
A team deploys a multi-service application. The CI/CD pipeline runs checks on PRs and auto-deploys on merge to main. The deployment workflow deploys interdependent Cloud Run services in sequence. Review both workflows.
# deploy.yml name: Deploy to Cloud Run on: push: branches: [main] jobs: deploy: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - id: auth uses: google-github-actions/auth@v2 with: workload_identity_provider: ${{ secrets.WIF_PROVIDER }} service_account: ${{ secrets.WIF_SERVICE_ACCOUNT }} - name: Deploy API service run: | gcloud run deploy api-service \ --image=europe-west1-docker.pkg.dev/proj/repo/api:${{ github.sha }} \ --region=europe-west1 - name: Deploy worker service run: | WORKER_URL=$(gcloud run services describe worker-service \ # line B --region=europe-west1 --format='value(status.url)') gcloud run deploy worker-service \ --image=europe-west1-docker.pkg.dev/proj/repo/worker:${{ github.sha }} \ --region=europe-west1 \ --set-env-vars=API_URL=https://api-service-xxxxx.run.app # line C
# pr-check.yml name: PR Checks on: pull_request: branches: [main] jobs: deploy-preview: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - id: auth uses: google-github-actions/auth@v2 with: credentials_json: ${{ secrets.GCP_SA_KEY }} # line D - name: Deploy preview run: | gcloud run deploy preview-${{ github.event.pull_request.number }} \ --image=europe-west1-docker.pkg.dev/proj/repo/api:${{ github.sha }} \ --region=europe-west1
API_URL=$(gcloud run services describe api-service --format='value(status.url)').