Backend Engineering Assessment

Seven practical coding questions. All must be answered correctly to pass.

Recording — webcam & screen

Backend Engineering Assessment

Seven practical questions covering pagination, multi-tenancy, data integrity, Odoo ORM, PostgreSQL, FastAPI, and CI/CD.
Find what is wrong with each code snippet and select the correct answer.

0 / 18 answered
0 / 18
Medium

Q1 — Cursor-Based Pagination

You are building a REST API endpoint that allows an external sync service to pull a large list of records page by page. The endpoint uses cursor-based pagination instead of offset-based to avoid duplicate or skipped records when data changes between requests. The cursor encodes the position of the last record seen on the previous page. Records are always returned sorted by (updated_at ASC, id ASC).

import base64
from datetime import datetime
from typing import Optional


def encode_cursor(updated_at: datetime, record_id: int) -> str:
    dt_str = updated_at.isoformat() if updated_at else ""
    raw = f"{dt_str}|{record_id}"
    return base64.urlsafe_b64encode(raw.encode()).decode()


def decode_cursor(cursor: str):
    try:
        raw = base64.urlsafe_b64decode(cursor.encode()).decode()
        dt_str, id_str = raw.split("|")                     # line A
        updated_at = datetime.fromisoformat(dt_str) if dt_str else None
        return updated_at, int(id_str)
    except Exception as exc:
        raise ValueError("Invalid cursor") from exc


def build_cursor_filter(updated_at: Optional[datetime], record_id: int) -> dict:
    if updated_at:
        return {
            "updated_at__gte": updated_at,                  # line B
            "id__gt": record_id,                            # line C
        }
    return {"id__gt": record_id}


def get_page(db, table, base_filter, cursor, page_size):
    filters = dict(base_filter)
    if cursor:
        cur_date, cur_id = decode_cursor(cursor)
        filters.update(build_cursor_filter(cur_date, cur_id))

    rows = db.query(table).filter(**filters) \
             .order_by("updated_at", "id").limit(page_size).all()
    # line D                                      ^^^^^^^^^^

    has_more = len(rows) > page_size                        # line E
    rows = rows[:page_size]

    next_cursor = encode_cursor(rows[-1].updated_at, rows[-1].id) \
                  if (has_more and rows) else None
    return {"items": rows, "has_more": has_more, "next_cursor": next_cursor}
rsplit("|", 1) is correct. The record ID is always the last segment. An ISO datetime can contain + characters but the raw format is datetime|id — splitting from the right ensures the ID is cleanly extracted even if the datetime portion ever contains a pipe. split("|", 1) splits from the left, which would put extra content in the wrong variable.
Records are sorted (updated_at ASC, id ASC). A record with a newer timestamp but a lower ID (e.g. id=42, updated_at=T+1 after cursor id=100, updated_at=T) is silently dropped by the AND filter because 42 ≤ 100. The correct logic is an OR: either the timestamp is strictly greater, or the timestamp equals and the ID is strictly greater.
.limit(page_size) returns at most page_size rows. Therefore len(rows) > page_size is always False. The caller thinks there are no more pages after the first one. Fix: fetch page_size + 1 rows as a probe, check if the extra row exists, then trim.
Hard

Q2 — Tenant Data Isolation

You are working on a multi-tenant SaaS application. Each tenant is identified by a unique tenant_id. All data belongs to exactly one tenant and must never be visible to another. Each request is authenticated and the resolved tenant_id is passed down to the data layer. A helper function can_access_tenant(user, tenant_id) checks whether the authenticated user is permitted to work with the given tenant.

from typing import Optional


def can_access_tenant(user, tenant_id: int) -> bool:
    return tenant_id in user.allowed_tenant_ids


def build_contact_filter(
    role: str,
    contact_id: Optional[int] = None,
    modified_since: Optional[str] = None,
    external_id: Optional[str] = None,
) -> dict:
    filters = {
        "role": role,                       # line A
    }
    if contact_id:   filters["id"] = contact_id
    if modified_since: filters["updated_at__gte"] = modified_since
    if external_id:  filters["external_id"] = external_id
    return filters


def list_contacts(db, user, tenant_id, role, ...):
    if not can_access_tenant(user, tenant_id):      # line B
        raise PermissionError("Access denied")

    filters = build_contact_filter(role=role, ...)
    rows = db.query("contacts").filter(**filters).limit(page_size + 1).all()
    has_more = len(rows) > page_size
    rows = rows[:page_size]
    return {"items": rows, "has_more": has_more}
Authorization ("is this user allowed?") and data scoping ("which rows belong to this tenant?") are two separate concerns. The access check passes, but the query runs against the entire contacts table with no tenant_id condition, returning records from all tenants.
build_contact_filter is a shared query builder used by list_contacts, get_contact, update_contact, etc. Fixing it there means every caller automatically gets tenant scoping.
ORM rules are often bypassed in privileged contexts (superuser queries, background jobs, migrations). An explicit tenant_id filter is always applied regardless of context, providing defense in depth.
Hard

Q3 — Atomic Write Operations

You are building an integration layer that receives contact records from an external sync service. When a contact is created, the system must perform two writes in sequence: (1) insert the contact into the local database, (2) write the external metadata (external_id, provider, synced_at). The external_id is the key used to deduplicate future sync calls. The sync service automatically retries any request that returns a 5xx response.

def find_existing_contact(db, tenant_id, external_id):
    return db.query("contacts").filter(
        tenant_id=tenant_id, external_id=external_id).first()


def create_contact(db, tenant_id, payload):
    external_id = payload.get("external_id")
    provider    = payload.get("provider")
    synced_at   = payload.get("synced_at")

    # Deduplicate: reject if this external_id already exists
    if external_id and find_existing_contact(db, tenant_id, external_id):
        raise ValueError(f"Contact already exists")

    # Step 1: insert the core contact record
    contact = db.insert("contacts", {
        "tenant_id": tenant_id,
        "name": payload.get("name"),
        "email": payload.get("email"),
    })
    # contact.id is now assigned. external_id is NOT yet written.

    # Step 2: write external metadata
    db.update("contacts", contact.id, {                     # line A
        "external_id": external_id,
        "provider": provider,
        "synced_at": synced_at,
    })
    return db.query("contacts").filter(id=contact.id).first()
The insert committed with external_id=NULL. The update failed, so the metadata was never written. On retry, find_existing_contact searches for external_id="CRM-99" and finds nothing. A second row is created, producing a duplicate orphan.
A savepoint groups the insert and update so both succeed or both roll back. If the update raises, the savepoint undoes the insert. The database returns to the state before create_contact was called.
Problem 1: If the DB raised on update, db.delete may fail for the same reason, leaving the orphan. Problem 2: Between insert and delete, the orphan row is visible to concurrent sessions. A savepoint keeps the insert invisible until the full block commits.
Hard

Q4 — Odoo ORM / Model Inheritance

An Odoo 17 module defines a model to track project budgets. The developer extended the existing project.project model with computed fields and a record rule for company-level isolation. Review the code for correctness.

from odoo import models, fields, api


class ProjectBudget(models.Model):
    _name = 'project.budget'        # line A
    _inherit = 'project.project'    # line B

    budget = fields.Float(string='Budget')
    spent = fields.Float(string='Spent')
    remaining = fields.Float(
        string='Remaining',
        compute='_compute_remaining',
        store=True,
    )

    @api.onchange('budget')         # line C
    def _compute_remaining(self):
        for rec in self:
            rec.remaining = rec.budget - rec.spent
<!-- Record rule XML -->
<record model="ir.rule" id="rule_budget_own_company">
  <field name="name">Budget: own company</field>
  <field name="model_id" ref="model_project_budget"/>
  <field name="domain_force">
    [('company_id','=',user.company_id.id)]
  </field>
  <field name="groups" eval="[]"/>           <!-- line D -->
</record>
In Odoo, when _name differs from _inherit, Odoo creates a brand new model with its own table (prototype inheritance). The new model copies all fields from the parent but stores data separately. Existing project.project records do NOT gain the budget fields. To extend in-place, use _inherit = 'project.project' without redefining _name.
@api.onchange only triggers during form interactions in the web client. It does NOT trigger on direct ORM write()/create() calls, background jobs, or XML-RPC. A stored computed field must use @api.depends('budget', 'spent') so the ORM recomputes whenever either dependency changes.
In Odoo, a record rule with empty groups is a global rule. Unlike group-specific rules (additive, bypassed by superuser), global rules are enforced for ALL users and cannot be bypassed even with sudo(). If any project.budget record has company_id = False, it becomes completely invisible to everyone.
Medium

Q5 — PostgreSQL Query & Migration

A backend team maintains a transactions table with ~8 million rows. They need to optimize a slow query and apply a schema migration. Review both operations.

-- Table definition:
CREATE TABLE transactions (
    id          BIGSERIAL PRIMARY KEY,
    tenant_id   UUID NOT NULL,
    status      VARCHAR(20) NOT NULL DEFAULT 'pending',
    amount      NUMERIC(12,2) NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Existing index:
CREATE INDEX idx_transactions_tenant
    ON transactions (tenant_id, created_at);

-- Slow query (takes 4.2s):
SELECT id, amount, created_at
FROM transactions
WHERE tenant_id = '...'
  AND status = 'completed'                    -- line A
  AND created_at >= '2025-01-01'
ORDER BY created_at DESC
LIMIT 50;

-- EXPLAIN ANALYZE output (excerpt):
--  Sort  (cost=28451..28452 rows=50)
--    -> Filter  (rows=50, rows removed by filter: 184720)  -- line B
--         -> Index Scan using idx_transactions_tenant
--             (rows=184770)

-- Migration script:
ALTER TABLE transactions
    ADD COLUMN notes TEXT NOT NULL;            -- line C
The existing index (tenant_id, created_at) lets PostgreSQL find rows for a given tenant within a date range, but it knows nothing about status. The database fetches all 184,770 matching rows, checks status = 'completed' on each, and discards 99.97%. A composite index (tenant_id, status, created_at) allows seeking directly to the right status.
When you add a NOT NULL column without a DEFAULT, PostgreSQL fails with ERROR: column contains null values. With a DEFAULT on PG 11+, the value is written to catalog metadata without touching rows — nearly instant and no lock.
Medium

Q6 — FastAPI + Cloud Function

A Google Cloud Function (Python, HTTP-triggered) wraps a FastAPI app. It calls an external payment API and returns the result. Review the code for correctness and security.

import functions_framework
import httpx, os
from fastapi import FastAPI, Request
from mangum import Mangum

app = FastAPI()

@app.post("/charge")
async def charge(request: Request):
    body = await request.json()
    amount = body["amount"]
    token = body["payment_token"]
    api_key = os.environ.get("PAYMENT_API_KEY")

    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://api.payments.example.com/v1/charges",
            json={"amount": amount, "source": token},
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=30,
        )

    if resp.status_code != 200:
        return {                                          # line B
            "error": True,
            "detail": resp.text,
            "headers": dict(resp.headers),
        }
    return {"success": True, "charge_id": resp.json()["id"]}


@app.post("/refund")
def refund(request: Request):                             # line C
    body = request.json()                                 # line C (cont.)
    # ... process refund ...
    return {"success": True}

@functions_framework.http
def main(request):
    handler = Mangum(app)
    return handler(request)
Returning raw upstream error text and headers to the client is an information leak. The upstream API's errors may contain internal references and infrastructure details. Additionally, the function returns HTTP 200 with {"error": True} — clients checking HTTP status codes think the request succeeded. Should return 502 with a generic error message.
In FastAPI/Starlette, Request.json() is an async method that returns a coroutine. Calling it without await returns the coroutine object itself, not the parsed JSON. Any dictionary access on this coroutine raises TypeError. Fix: async def refund() and body = await request.json().
Medium

Q7 — CI/CD: GitHub Actions + Cloud Run

A team deploys a multi-service application. The CI/CD pipeline runs checks on PRs and auto-deploys on merge to main. The deployment workflow deploys interdependent Cloud Run services in sequence. Review both workflows.

# deploy.yml
name: Deploy to Cloud Run
on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - id: auth
        uses: google-github-actions/auth@v2
        with:
          workload_identity_provider: ${{ secrets.WIF_PROVIDER }}
          service_account: ${{ secrets.WIF_SERVICE_ACCOUNT }}

      - name: Deploy API service
        run: |
          gcloud run deploy api-service \
            --image=europe-west1-docker.pkg.dev/proj/repo/api:${{ github.sha }} \
            --region=europe-west1

      - name: Deploy worker service
        run: |
          WORKER_URL=$(gcloud run services describe worker-service \  # line B
            --region=europe-west1 --format='value(status.url)')
          gcloud run deploy worker-service \
            --image=europe-west1-docker.pkg.dev/proj/repo/worker:${{ github.sha }} \
            --region=europe-west1 \
            --set-env-vars=API_URL=https://api-service-xxxxx.run.app  # line C
# pr-check.yml
name: PR Checks
on:
  pull_request:
    branches: [main]

jobs:
  deploy-preview:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - id: auth
        uses: google-github-actions/auth@v2
        with:
          credentials_json: ${{ secrets.GCP_SA_KEY }}        # line D
      - name: Deploy preview
        run: |
          gcloud run deploy preview-${{ github.event.pull_request.number }} \
            --image=europe-west1-docker.pkg.dev/proj/repo/api:${{ github.sha }} \
            --region=europe-west1
Line B reads the worker URL before deploying the new version (pointless — the URL doesn't change). Line C hardcodes a URL hash that will break if the API service is recreated, moved to a different region, or renamed. The fix is to read the API URL dynamically: API_URL=$(gcloud run services describe api-service --format='value(status.url)').
SA JSON keys are long-lived credentials that never expire. If the GitHub secret is leaked (repo compromise, log exposure, fork access), the attacker has permanent GCP access until the key is manually revoked. Workload Identity Federation uses short-lived tokens scoped to the specific workflow run, with no persistent credentials to leak.