Skip to content

Automation Workflows

VaultStream provides a webhook system and event-driven API that enables customers to build automated content pipelines. This guide covers common automation patterns.

Webhooks

Webhooks deliver real-time notifications for content lifecycle events:

Event Types

Event Trigger Payload
content.uploaded File upload complete content_id, title, size_bytes, uploaded_by
content.transcoded Transcoding complete content_id, variants[], duration_seconds
content.deleted Content removed content_id, deleted_by, deleted_at
content.updated Metadata changed content_id, changed_fields[]
playback.started Viewer starts watching content_id, viewer_id, session_id
playback.completed Viewer finishes (90%+) content_id, viewer_id, watch_time_seconds
user.provisioned New user created user_id, email, role
user.deactivated User deactivated user_id, email

Webhook Configuration

POST /v1/admin/webhooks
Authorization: Bearer <ADMIN_TOKEN>
{
  "url": "https://internal.acme.corp/webhooks/vaultstream",
  "events": ["content.transcoded", "playback.completed"],
  "secret": "whsec_xxxxxxxxxxxxxxxx",
  "active": true
}

Webhook Signature Verification

import hmac, hashlib, json

def verify_signature(payload_body, signature_header, secret):
    """Verify webhook signature using HMAC-SHA256."""
    expected = hmac.new(
        secret.encode(),
        payload_body,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature_header)

# Flask example
@app.route("/webhooks/vaultstream", methods=["POST"])
def vaultstream_webhook():
    signature = request.headers.get("VaultStream-Signature")
    if not verify_signature(request.data, signature, os.environ["VS_WEBHOOK_SECRET"]):
        return "Invalid signature", 401

    event = request.json
    if event["type"] == "content.transcoded":
        cid = event["data"]["content_id"]
        # Trigger downstream processing
        notify_team(f"Video ready: {cid}")
        update_catalog(cid, event["data"])

    return "", 200

Common Automation Patterns

Pattern 1: Auto-Publishing Pipeline

When a video finishes transcoding, automatically publish it to the appropriate team folder:

@app.route("/webhooks/vaultstream", methods=["POST"])
def handle_transcode_complete():
    event = request.json
    if event["type"] != "content.transcoded":
        return "", 200

    cid = event["data"]["content_id"]
    title = event["data"]["title"]

    # Determine target folder from metadata
    resp = requests.get(
        f"https://api.cyfr.technology/v1/content/{cid}",
        headers={"Authorization": f"Bearer {API_TOKEN}"}
    )
    meta = resp.json().get("metadata", {})

    department = meta.get("department", "General")
    folder_id = get_or_create_folder(department)

    # Move to folder
    requests.patch(
        f"https://api.cyfr.technology/v1/content/{cid}",
        headers={"Authorization": f"Bearer {API_TOKEN}"},
        json={"folder_id": folder_id, "visibility": "team"}
    )

    # Grant access to department team
    team_id = get_department_team(department)
    requests.post(
        f"https://api.cyfr.technology/v1/content/{cid}/permissions",
        headers={"Authorization": f"Bearer {API_TOKEN}"},
        json={"team_id": team_id, "permission": "view"}
    )

    logger.info(f"Published {title} to {department} folder")
    return "", 200

Pattern 2: Scheduled Content Archival

Archive videos that haven't been watched in 90 days:

import requests
from datetime import datetime, timedelta

API = "https://api.cyfr.technology/v1"
TOKEN = os.environ["VS_ADMIN_TOKEN"]
ARCHIVE_DAYS = 90

def archive_stale_content():
    """Move unwatched content to archive after threshold."""
    cutoff = (datetime.utcnow() - timedelta(days=ARCHIVE_DAYS)).isoformat()

    # Query analytics for content with no recent views
    resp = requests.get(
        f"{API}/analytics/content-usage",
        headers={"Authorization": f"Bearer {TOKEN}"},
        params={"last_accessed_before": cutoff, "limit": 100}
    )
    stale = resp.json()["items"]

    import requests
    from datetime import datetime, timedelta

    for item in stale:
        requests.patch(
            f"{API}/content/{item['content_id']}",
            headers={"Authorization": f"Bearer {TOKEN}"},
            json={"folder_id": ARCHIVE_FOLDER_ID}
        )
        print(f"Archived: {item['title']} (last accessed {item['last_accessed']})")

if __name__ == "__main__":
    archive_stale_content()

Pattern 3: Custom Content Organization

Organize ingested content based on filename patterns or metadata:

def classify_content(content_id, title, metadata):
    """Apply organizational rules based on content characteristics."""

    title_lower = title.lower()

    # Training content → Training folder
    if any(kw in title_lower for kw in ['training', 'module', 'onboarding', 'compliance']):
        return {"folder": "Training", "tags": ["training"]}

    # Executive communications → Leadership folder
    if any(kw in title_lower for kw in ['town hall', 'all-hands', 'ceo', 'quarterly']):
        return {"folder": "Leadership", "visibility": "team"}

    # Department-specific routing
    dept = metadata.get("department", "").lower()
    if dept:
        return {"folder": dept.title(), "tags": [dept]}

    # Default
    return {"folder": "Uncategorized", "tags": []}

Pattern 4: Multi-Format Export Pipeline

When a video is uploaded, automatically generate derivative formats:

@app.route("/webhooks/vaultstream", methods=["POST"])
def generate_derivatives():
    event = request.json
    if event["type"] != "content.transcoded":
        return "", 200

    cid = event["data"]["content_id"]

    # Request audio-only version for podcast distribution
    requests.post(
        f"https://api.cyfr.technology/v1/content/{cid}/transcode",
        headers={"Authorization": f"Bearer {API_TOKEN}"},
        json={
            "profile": "audio_only_aac",
            "output_format": "m4a"
        }
    )

    # Request GIF preview for social media
    requests.post(
        f"https://api.cyfr.technology/v1/content/{cid}/export",
        headers={"Authorization": f"Bearer {API_TOKEN}"},
        json={
            "format": "gif",
            "start_seconds": 30,
            "duration_seconds": 10,
            "width": 480
        }
    )

    return "", 200

Scheduled Jobs

VaultStream's Admin API supports CRON-like scheduling for recurring automation tasks:

POST /v1/admin/scheduled-jobs
{
  "name": "Nightly Archive Check",
  "schedule": "0 2 * * *",
  "action": "run_script",
  "script_url": "https://internal.acme.corp/scripts/archive-stale.py",
  "timeout_seconds": 300
}

Important Note on Platform Responsibility

VaultStream provides the ingestion API, transcoding infrastructure, and automation framework. What content customers upload, how they organize it, and who they grant access to is entirely under the customer's control. CYFR Technologies operates a zero-knowledge architecture — we cannot access, inspect, or monitor customer content. We provide the platform; customers are responsible for the content they manage through it.

This separation is fundamental to our security model and is documented in our Terms of Service and DMCA Policy.