Skip to content

Token Vending

Token vending services transform credentials from various sources into EvidentSource-compatible JWTs with the evs:grants claim.

Use a token vending service when:

  • Your IdP cannot add custom claims (like evs:grants)
  • You need to derive grants from external systems (LDAP, database, etc.)
  • You’re integrating with AWS services that use IAM authentication
  • You need certificate-based (mTLS) authentication
┌─────────────────────┐ ┌──────────────────────────────────────────────┐
│ Source Token │ │ Token Vending Service │
│ (OIDC, IAM, Cert) │────▶│ │
└─────────────────────┘ │ 1. Validate source credential │
│ 2. Lookup grants (DynamoDB, LDAP, etc.) │
│ 3. Sign EvidentSource JWT │
│ │
└────────────────────┬─────────────────────────┘
┌──────────────────────────────────────────────┐
│ EvidentSource JWT │
│ │
│ { │
│ "sub": "user-123", │
│ "iss": "https://token-vending.example.com"│
│ "evs:grants": { ... } │
│ } │
└──────────────────────────────────────────────┘

ALB JWT Verification only supports RS256. Your token vending service must:

  1. Generate an RSA key pair (minimum 2048-bit)
  2. Publish the public key at a JWKS endpoint
  3. Sign tokens with the private key using RS256

Exchange IdP tokens for EvidentSource JWTs.

import json
import os
import time
import boto3
import jwt
import requests
from functools import lru_cache
# Configuration
PRIVATE_KEY = os.environ.get('JWT_PRIVATE_KEY')
ISSUER = os.environ.get('TOKEN_ISSUER')
TOKEN_LIFETIME_SECONDS = 3600
GRANTS_TABLE = os.environ.get('GRANTS_TABLE')
dynamodb = boto3.resource('dynamodb')
grants_table = dynamodb.Table(GRANTS_TABLE)
@lru_cache(maxsize=1)
def get_idp_jwks(issuer: str) -> dict:
"""Fetch and cache IdP JWKS."""
jwks_url = f"{issuer}/.well-known/jwks.json"
response = requests.get(jwks_url, timeout=10)
response.raise_for_status()
return response.json()
def validate_idp_token(token: str) -> dict:
"""Validate incoming IdP token and return claims."""
# Decode header to get key ID
unverified = jwt.decode(token, options={"verify_signature": False})
issuer = unverified.get('iss')
# Get JWKS from IdP
jwks = get_idp_jwks(issuer)
# Find the signing key
header = jwt.get_unverified_header(token)
key = None
for k in jwks['keys']:
if k['kid'] == header['kid']:
key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(k))
break
if not key:
raise ValueError("Key not found in JWKS")
# Verify token
return jwt.decode(token, key, algorithms=['RS256'], options={
'verify_aud': False, # Configure as needed
})
def lookup_grants(subject: str, email: str = None) -> dict:
"""Lookup grants from DynamoDB."""
response = grants_table.get_item(Key={'subject': subject})
if 'Item' not in response and email:
# Fallback to email lookup
response = grants_table.get_item(Key={'subject': email})
if 'Item' not in response:
# Return empty grants for unknown users
return {'global': [], 'databases': {}}
return response['Item'].get('grants', {'global': [], 'databases': {}})
def create_evs_token(claims: dict, grants: dict) -> str:
"""Create EvidentSource-compatible JWT."""
now = int(time.time())
payload = {
'iss': ISSUER,
'sub': claims['sub'],
'iat': now,
'exp': now + TOKEN_LIFETIME_SECONDS,
'evs:grants': grants,
}
# Include optional claims if present
if 'email' in claims:
payload['email'] = claims['email']
if 'name' in claims:
payload['name'] = claims['name']
return jwt.encode(payload, PRIVATE_KEY, algorithm='RS256')
def handler(event, context):
"""Lambda handler for token exchange."""
try:
# Parse request
body = json.loads(event.get('body', '{}'))
idp_token = body.get('token')
if not idp_token:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Missing token parameter'})
}
# Validate IdP token
claims = validate_idp_token(idp_token)
# Lookup grants
grants = lookup_grants(claims['sub'], claims.get('email'))
# Create EvidentSource token
evs_token = create_evs_token(claims, grants)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'access_token': evs_token,
'token_type': 'Bearer',
'expires_in': TOKEN_LIFETIME_SECONDS,
})
}
except jwt.ExpiredSignatureError:
return {'statusCode': 401, 'body': json.dumps({'error': 'Token expired'})}
except jwt.InvalidTokenError as e:
return {'statusCode': 401, 'body': json.dumps({'error': f'Invalid token: {e}'})}
except Exception as e:
print(f"Error: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}
GrantsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: evs-grants
AttributeDefinitions:
- AttributeName: subject
AttributeType: S
KeySchema:
- AttributeName: subject
KeyType: HASH
BillingMode: PAY_PER_REQUEST

Example item:

{
"subject": "user@example.com",
"grants": {
"global": ["database_creator"],
"databases": {
"production": ["reader", "writer"],
"staging": ["reader", "writer", "deployer"]
}
}
}

Convert AWS IAM credentials to EvidentSource JWTs for AWS services.

import json
import os
import time
import boto3
import jwt
PRIVATE_KEY = os.environ.get('JWT_PRIVATE_KEY')
ISSUER = os.environ.get('TOKEN_ISSUER')
TOKEN_LIFETIME_SECONDS = 3600
sts = boto3.client('sts')
iam = boto3.client('iam')
def get_role_grants(role_arn: str) -> dict:
"""Extract grants from IAM role tags."""
# Parse role name from ARN
role_name = role_arn.split('/')[-1]
try:
response = iam.list_role_tags(RoleName=role_name)
tags = {t['Key']: t['Value'] for t in response['Tags']}
# Look for evs:grants tag
if 'evs:grants' in tags:
return json.loads(tags['evs:grants'])
# Or construct from individual tags
grants = {'global': [], 'databases': {}}
if 'evs:global-roles' in tags:
grants['global'] = tags['evs:global-roles'].split(',')
# Pattern: evs:db:<database_id> = role1,role2
for key, value in tags.items():
if key.startswith('evs:db:'):
db_id = key[7:] # Remove 'evs:db:' prefix
grants['databases'][db_id] = value.split(',')
return grants
except Exception as e:
print(f"Error getting role tags: {e}")
return {'global': [], 'databases': {}}
def create_evs_token(identity: dict, grants: dict) -> str:
"""Create EvidentSource-compatible JWT."""
now = int(time.time())
return jwt.encode({
'iss': ISSUER,
'sub': identity['Arn'],
'iat': now,
'exp': now + TOKEN_LIFETIME_SECONDS,
'evs:grants': grants,
'aws:account': identity['Account'],
'aws:user_id': identity['UserId'],
}, PRIVATE_KEY, algorithm='RS256')
def handler(event, context):
"""Lambda handler for IAM-to-JWT conversion."""
try:
# Get caller identity from the request context
# (Lambda must be invoked with IAM auth)
request_context = event.get('requestContext', {})
identity = request_context.get('identity', {})
# Or use STS to get identity from the Lambda execution context
caller = sts.get_caller_identity()
# Lookup grants from role tags
grants = get_role_grants(caller['Arn'])
# Create EvidentSource token
evs_token = create_evs_token(caller, grants)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'access_token': evs_token,
'token_type': 'Bearer',
'expires_in': TOKEN_LIFETIME_SECONDS,
})
}
except Exception as e:
print(f"Error: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}
ServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: order-service-role
Tags:
- Key: evs:db:orders
Value: reader,writer
- Key: evs:db:inventory
Value: reader

Convert client certificates to EvidentSource JWTs.

import json
import os
import time
import boto3
import jwt
from cryptography import x509
from cryptography.hazmat.primitives import serialization
PRIVATE_KEY = os.environ.get('JWT_PRIVATE_KEY')
ISSUER = os.environ.get('TOKEN_ISSUER')
TOKEN_LIFETIME_SECONDS = 3600
GRANTS_TABLE = os.environ.get('GRANTS_TABLE')
dynamodb = boto3.resource('dynamodb')
grants_table = dynamodb.Table(GRANTS_TABLE)
def extract_cert_info(cert_pem: str) -> dict:
"""Extract subject info from X.509 certificate."""
cert = x509.load_pem_x509_certificate(cert_pem.encode())
subject = cert.subject
cn = subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
org = subject.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME)
return {
'cn': cn[0].value if cn else None,
'org': org[0].value if org else None,
'serial': cert.serial_number,
'not_after': cert.not_valid_after_utc.isoformat(),
}
def lookup_grants_by_cn(cn: str) -> dict:
"""Lookup grants by certificate CN."""
response = grants_table.get_item(Key={'subject': f'cert:{cn}'})
if 'Item' not in response:
return {'global': [], 'databases': {}}
return response['Item'].get('grants', {'global': [], 'databases': {}})
def create_evs_token(cert_info: dict, grants: dict) -> str:
"""Create EvidentSource-compatible JWT."""
now = int(time.time())
return jwt.encode({
'iss': ISSUER,
'sub': f"cert:{cert_info['cn']}",
'iat': now,
'exp': now + TOKEN_LIFETIME_SECONDS,
'evs:grants': grants,
'cert:cn': cert_info['cn'],
'cert:org': cert_info['org'],
}, PRIVATE_KEY, algorithm='RS256')
def handler(event, context):
"""Lambda handler for mTLS-to-JWT conversion."""
try:
# Get client certificate from API Gateway or ALB
# API Gateway: event['requestContext']['identity']['clientCert']
# ALB: event['headers']['x-amzn-mtls-clientcert']
cert_header = event.get('headers', {}).get('x-amzn-mtls-clientcert')
if not cert_header:
client_cert = event.get('requestContext', {}).get('identity', {}).get('clientCert', {})
cert_pem = client_cert.get('clientCertPem')
else:
# URL-decode the header
from urllib.parse import unquote
cert_pem = unquote(cert_header)
if not cert_pem:
return {
'statusCode': 401,
'body': json.dumps({'error': 'No client certificate provided'})
}
# Extract certificate info
cert_info = extract_cert_info(cert_pem)
# Lookup grants
grants = lookup_grants_by_cn(cert_info['cn'])
# Create EvidentSource token
evs_token = create_evs_token(cert_info, grants)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'access_token': evs_token,
'token_type': 'Bearer',
'expires_in': TOKEN_LIFETIME_SECONDS,
})
}
except Exception as e:
print(f"Error: {e}")
return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}

Your token vending service must expose a JWKS endpoint for ALB JWT Verification.

import json
import os
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import jwt
PUBLIC_KEY_PEM = os.environ.get('JWT_PUBLIC_KEY')
KEY_ID = os.environ.get('JWT_KEY_ID', 'evs-token-vending-key')
def handler(event, context):
"""Return JWKS for token verification."""
# Load the public key
public_key = serialization.load_pem_public_key(
PUBLIC_KEY_PEM.encode(),
backend=default_backend()
)
# Convert to JWK format
from jwt.algorithms import RSAAlgorithm
jwk = json.loads(RSAAlgorithm.to_jwk(public_key))
jwk['kid'] = KEY_ID
jwk['use'] = 'sig'
jwk['alg'] = 'RS256'
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Cache-Control': 'public, max-age=3600',
},
'body': json.dumps({'keys': [jwk]})
}
Terminal window
# Generate private key
openssl genrsa -out private.pem 2048
# Extract public key
openssl rsa -in private.pem -pubout -out public.pem

Option 1: Secrets Manager

JwtPrivateKey:
Type: AWS::SecretsManager::Secret
Properties:
Name: evs-token-vending-private-key
SecretString: !Sub |
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----

Option 2: SSM Parameter Store

JwtPublicKey:
Type: AWS::SSM::Parameter
Properties:
Name: /evs/token-vending/public-key
Type: String
Value: |
-----BEGIN PUBLIC KEY-----
...
-----END PUBLIC KEY-----
  1. Generate new key pair
  2. Add new public key to JWKS endpoint (keep both keys)
  3. Update token vending to sign with new private key
  4. Wait for all existing tokens to expire
  5. Remove old public key from JWKS endpoint
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Parameters:
JwtPrivateKeySecret:
Type: String
Description: ARN of Secrets Manager secret containing JWT private key
Resources:
TokenVendingFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: python3.11
Handler: index.handler
Timeout: 30
Environment:
Variables:
TOKEN_ISSUER: !Sub "https://${TokenVendingApi}.execute-api.${AWS::Region}.amazonaws.com"
GRANTS_TABLE: !Ref GrantsTable
Policies:
- DynamoDBReadPolicy:
TableName: !Ref GrantsTable
- Statement:
- Effect: Allow
Action: secretsmanager:GetSecretValue
Resource: !Ref JwtPrivateKeySecret
Events:
Exchange:
Type: Api
Properties:
Path: /exchange
Method: POST
Jwks:
Type: Api
Properties:
Path: /.well-known/jwks.json
Method: GET
GrantsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: evs-grants
AttributeDefinitions:
- AttributeName: subject
AttributeType: S
KeySchema:
- AttributeName: subject
KeyType: HASH
BillingMode: PAY_PER_REQUEST
import requests
def get_evs_token(idp_token: str) -> str:
"""Exchange IdP token for EvidentSource token."""
response = requests.post(
'https://token-vending.example.com/exchange',
json={'token': idp_token},
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
return response.json()['access_token']
# Use with EvidentSource SDK
evs_token = get_evs_token(my_idp_token)
client = await EvidentSource.connect(
"api.example.com:50051",
BearerTokenCredentials(evs_token)
)