Token Vending
Token vending services transform credentials from various sources into EvidentSource-compatible JWTs with the evs:grants claim.
When to Use Token Vending
Section titled “When to Use Token Vending”Use a token vending service when:
- Your IdP cannot add custom claims (like
evs:grants) - You need to derive grants from external systems (LDAP, database, etc.)
- You’re integrating with AWS services that use IAM authentication
- You need certificate-based (mTLS) authentication
Architecture
Section titled “Architecture”┌─────────────────────┐ ┌──────────────────────────────────────────────┐│ Source Token │ │ Token Vending Service ││ (OIDC, IAM, Cert) │────▶│ │└─────────────────────┘ │ 1. Validate source credential │ │ 2. Lookup grants (DynamoDB, LDAP, etc.) │ │ 3. Sign EvidentSource JWT │ │ │ └────────────────────┬─────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ EvidentSource JWT │ │ │ │ { │ │ "sub": "user-123", │ │ "iss": "https://token-vending.example.com"│ │ "evs:grants": { ... } │ │ } │ └──────────────────────────────────────────────┘JWT Signing Requirements
Section titled “JWT Signing Requirements”ALB JWT Verification only supports RS256. Your token vending service must:
- Generate an RSA key pair (minimum 2048-bit)
- Publish the public key at a JWKS endpoint
- Sign tokens with the private key using RS256
Pattern 1: OIDC Token Exchange
Section titled “Pattern 1: OIDC Token Exchange”Exchange IdP tokens for EvidentSource JWTs.
Lambda Function
Section titled “Lambda Function”import jsonimport osimport timeimport boto3import jwtimport requestsfrom functools import lru_cache
# ConfigurationPRIVATE_KEY = os.environ.get('JWT_PRIVATE_KEY')ISSUER = os.environ.get('TOKEN_ISSUER')TOKEN_LIFETIME_SECONDS = 3600GRANTS_TABLE = os.environ.get('GRANTS_TABLE')
dynamodb = boto3.resource('dynamodb')grants_table = dynamodb.Table(GRANTS_TABLE)
@lru_cache(maxsize=1)def get_idp_jwks(issuer: str) -> dict: """Fetch and cache IdP JWKS.""" jwks_url = f"{issuer}/.well-known/jwks.json" response = requests.get(jwks_url, timeout=10) response.raise_for_status() return response.json()
def validate_idp_token(token: str) -> dict: """Validate incoming IdP token and return claims.""" # Decode header to get key ID unverified = jwt.decode(token, options={"verify_signature": False}) issuer = unverified.get('iss')
# Get JWKS from IdP jwks = get_idp_jwks(issuer)
# Find the signing key header = jwt.get_unverified_header(token) key = None for k in jwks['keys']: if k['kid'] == header['kid']: key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(k)) break
if not key: raise ValueError("Key not found in JWKS")
# Verify token return jwt.decode(token, key, algorithms=['RS256'], options={ 'verify_aud': False, # Configure as needed })
def lookup_grants(subject: str, email: str = None) -> dict: """Lookup grants from DynamoDB.""" response = grants_table.get_item(Key={'subject': subject})
if 'Item' not in response and email: # Fallback to email lookup response = grants_table.get_item(Key={'subject': email})
if 'Item' not in response: # Return empty grants for unknown users return {'global': [], 'databases': {}}
return response['Item'].get('grants', {'global': [], 'databases': {}})
def create_evs_token(claims: dict, grants: dict) -> str: """Create EvidentSource-compatible JWT.""" now = int(time.time())
payload = { 'iss': ISSUER, 'sub': claims['sub'], 'iat': now, 'exp': now + TOKEN_LIFETIME_SECONDS, 'evs:grants': grants, }
# Include optional claims if present if 'email' in claims: payload['email'] = claims['email'] if 'name' in claims: payload['name'] = claims['name']
return jwt.encode(payload, PRIVATE_KEY, algorithm='RS256')
def handler(event, context): """Lambda handler for token exchange.""" try: # Parse request body = json.loads(event.get('body', '{}')) idp_token = body.get('token')
if not idp_token: return { 'statusCode': 400, 'body': json.dumps({'error': 'Missing token parameter'}) }
# Validate IdP token claims = validate_idp_token(idp_token)
# Lookup grants grants = lookup_grants(claims['sub'], claims.get('email'))
# Create EvidentSource token evs_token = create_evs_token(claims, grants)
return { 'statusCode': 200, 'headers': {'Content-Type': 'application/json'}, 'body': json.dumps({ 'access_token': evs_token, 'token_type': 'Bearer', 'expires_in': TOKEN_LIFETIME_SECONDS, }) }
except jwt.ExpiredSignatureError: return {'statusCode': 401, 'body': json.dumps({'error': 'Token expired'})} except jwt.InvalidTokenError as e: return {'statusCode': 401, 'body': json.dumps({'error': f'Invalid token: {e}'})} except Exception as e: print(f"Error: {e}") return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}DynamoDB Grants Table Schema
Section titled “DynamoDB Grants Table Schema”GrantsTable: Type: AWS::DynamoDB::Table Properties: TableName: evs-grants AttributeDefinitions: - AttributeName: subject AttributeType: S KeySchema: - AttributeName: subject KeyType: HASH BillingMode: PAY_PER_REQUESTExample item:
{ "subject": "user@example.com", "grants": { "global": ["database_creator"], "databases": { "production": ["reader", "writer"], "staging": ["reader", "writer", "deployer"] } }}Pattern 2: IAM to JWT
Section titled “Pattern 2: IAM to JWT”Convert AWS IAM credentials to EvidentSource JWTs for AWS services.
Lambda Function
Section titled “Lambda Function”import jsonimport osimport timeimport boto3import jwt
PRIVATE_KEY = os.environ.get('JWT_PRIVATE_KEY')ISSUER = os.environ.get('TOKEN_ISSUER')TOKEN_LIFETIME_SECONDS = 3600
sts = boto3.client('sts')iam = boto3.client('iam')
def get_role_grants(role_arn: str) -> dict: """Extract grants from IAM role tags.""" # Parse role name from ARN role_name = role_arn.split('/')[-1]
try: response = iam.list_role_tags(RoleName=role_name) tags = {t['Key']: t['Value'] for t in response['Tags']}
# Look for evs:grants tag if 'evs:grants' in tags: return json.loads(tags['evs:grants'])
# Or construct from individual tags grants = {'global': [], 'databases': {}}
if 'evs:global-roles' in tags: grants['global'] = tags['evs:global-roles'].split(',')
# Pattern: evs:db:<database_id> = role1,role2 for key, value in tags.items(): if key.startswith('evs:db:'): db_id = key[7:] # Remove 'evs:db:' prefix grants['databases'][db_id] = value.split(',')
return grants
except Exception as e: print(f"Error getting role tags: {e}") return {'global': [], 'databases': {}}
def create_evs_token(identity: dict, grants: dict) -> str: """Create EvidentSource-compatible JWT.""" now = int(time.time())
return jwt.encode({ 'iss': ISSUER, 'sub': identity['Arn'], 'iat': now, 'exp': now + TOKEN_LIFETIME_SECONDS, 'evs:grants': grants, 'aws:account': identity['Account'], 'aws:user_id': identity['UserId'], }, PRIVATE_KEY, algorithm='RS256')
def handler(event, context): """Lambda handler for IAM-to-JWT conversion.""" try: # Get caller identity from the request context # (Lambda must be invoked with IAM auth) request_context = event.get('requestContext', {}) identity = request_context.get('identity', {})
# Or use STS to get identity from the Lambda execution context caller = sts.get_caller_identity()
# Lookup grants from role tags grants = get_role_grants(caller['Arn'])
# Create EvidentSource token evs_token = create_evs_token(caller, grants)
return { 'statusCode': 200, 'headers': {'Content-Type': 'application/json'}, 'body': json.dumps({ 'access_token': evs_token, 'token_type': 'Bearer', 'expires_in': TOKEN_LIFETIME_SECONDS, }) }
except Exception as e: print(f"Error: {e}") return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}IAM Role Tags Example
Section titled “IAM Role Tags Example”ServiceRole: Type: AWS::IAM::Role Properties: RoleName: order-service-role Tags: - Key: evs:db:orders Value: reader,writer - Key: evs:db:inventory Value: readerPattern 3: mTLS to JWT
Section titled “Pattern 3: mTLS to JWT”Convert client certificates to EvidentSource JWTs.
Lambda Function
Section titled “Lambda Function”import jsonimport osimport timeimport boto3import jwtfrom cryptography import x509from cryptography.hazmat.primitives import serialization
PRIVATE_KEY = os.environ.get('JWT_PRIVATE_KEY')ISSUER = os.environ.get('TOKEN_ISSUER')TOKEN_LIFETIME_SECONDS = 3600GRANTS_TABLE = os.environ.get('GRANTS_TABLE')
dynamodb = boto3.resource('dynamodb')grants_table = dynamodb.Table(GRANTS_TABLE)
def extract_cert_info(cert_pem: str) -> dict: """Extract subject info from X.509 certificate.""" cert = x509.load_pem_x509_certificate(cert_pem.encode())
subject = cert.subject cn = subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME) org = subject.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME)
return { 'cn': cn[0].value if cn else None, 'org': org[0].value if org else None, 'serial': cert.serial_number, 'not_after': cert.not_valid_after_utc.isoformat(), }
def lookup_grants_by_cn(cn: str) -> dict: """Lookup grants by certificate CN.""" response = grants_table.get_item(Key={'subject': f'cert:{cn}'})
if 'Item' not in response: return {'global': [], 'databases': {}}
return response['Item'].get('grants', {'global': [], 'databases': {}})
def create_evs_token(cert_info: dict, grants: dict) -> str: """Create EvidentSource-compatible JWT.""" now = int(time.time())
return jwt.encode({ 'iss': ISSUER, 'sub': f"cert:{cert_info['cn']}", 'iat': now, 'exp': now + TOKEN_LIFETIME_SECONDS, 'evs:grants': grants, 'cert:cn': cert_info['cn'], 'cert:org': cert_info['org'], }, PRIVATE_KEY, algorithm='RS256')
def handler(event, context): """Lambda handler for mTLS-to-JWT conversion.""" try: # Get client certificate from API Gateway or ALB # API Gateway: event['requestContext']['identity']['clientCert'] # ALB: event['headers']['x-amzn-mtls-clientcert']
cert_header = event.get('headers', {}).get('x-amzn-mtls-clientcert') if not cert_header: client_cert = event.get('requestContext', {}).get('identity', {}).get('clientCert', {}) cert_pem = client_cert.get('clientCertPem') else: # URL-decode the header from urllib.parse import unquote cert_pem = unquote(cert_header)
if not cert_pem: return { 'statusCode': 401, 'body': json.dumps({'error': 'No client certificate provided'}) }
# Extract certificate info cert_info = extract_cert_info(cert_pem)
# Lookup grants grants = lookup_grants_by_cn(cert_info['cn'])
# Create EvidentSource token evs_token = create_evs_token(cert_info, grants)
return { 'statusCode': 200, 'headers': {'Content-Type': 'application/json'}, 'body': json.dumps({ 'access_token': evs_token, 'token_type': 'Bearer', 'expires_in': TOKEN_LIFETIME_SECONDS, }) }
except Exception as e: print(f"Error: {e}") return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}JWKS Endpoint
Section titled “JWKS Endpoint”Your token vending service must expose a JWKS endpoint for ALB JWT Verification.
Lambda Function
Section titled “Lambda Function”import jsonimport osfrom cryptography.hazmat.primitives import serializationfrom cryptography.hazmat.backends import default_backendimport jwt
PUBLIC_KEY_PEM = os.environ.get('JWT_PUBLIC_KEY')KEY_ID = os.environ.get('JWT_KEY_ID', 'evs-token-vending-key')
def handler(event, context): """Return JWKS for token verification.""" # Load the public key public_key = serialization.load_pem_public_key( PUBLIC_KEY_PEM.encode(), backend=default_backend() )
# Convert to JWK format from jwt.algorithms import RSAAlgorithm jwk = json.loads(RSAAlgorithm.to_jwk(public_key)) jwk['kid'] = KEY_ID jwk['use'] = 'sig' jwk['alg'] = 'RS256'
return { 'statusCode': 200, 'headers': { 'Content-Type': 'application/json', 'Cache-Control': 'public, max-age=3600', }, 'body': json.dumps({'keys': [jwk]}) }Key Management
Section titled “Key Management”Generating RSA Keys
Section titled “Generating RSA Keys”# Generate private keyopenssl genrsa -out private.pem 2048
# Extract public keyopenssl rsa -in private.pem -pubout -out public.pemStoring Keys in AWS
Section titled “Storing Keys in AWS”Option 1: Secrets Manager
JwtPrivateKey: Type: AWS::SecretsManager::Secret Properties: Name: evs-token-vending-private-key SecretString: !Sub | -----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----Option 2: SSM Parameter Store
JwtPublicKey: Type: AWS::SSM::Parameter Properties: Name: /evs/token-vending/public-key Type: String Value: | -----BEGIN PUBLIC KEY----- ... -----END PUBLIC KEY-----Key Rotation
Section titled “Key Rotation”- Generate new key pair
- Add new public key to JWKS endpoint (keep both keys)
- Update token vending to sign with new private key
- Wait for all existing tokens to expire
- Remove old public key from JWKS endpoint
Deployment
Section titled “Deployment”CloudFormation Template
Section titled “CloudFormation Template”AWSTemplateFormatVersion: '2010-09-09'Transform: AWS::Serverless-2016-10-31
Parameters: JwtPrivateKeySecret: Type: String Description: ARN of Secrets Manager secret containing JWT private key
Resources: TokenVendingFunction: Type: AWS::Serverless::Function Properties: Runtime: python3.11 Handler: index.handler Timeout: 30 Environment: Variables: TOKEN_ISSUER: !Sub "https://${TokenVendingApi}.execute-api.${AWS::Region}.amazonaws.com" GRANTS_TABLE: !Ref GrantsTable Policies: - DynamoDBReadPolicy: TableName: !Ref GrantsTable - Statement: - Effect: Allow Action: secretsmanager:GetSecretValue Resource: !Ref JwtPrivateKeySecret Events: Exchange: Type: Api Properties: Path: /exchange Method: POST Jwks: Type: Api Properties: Path: /.well-known/jwks.json Method: GET
GrantsTable: Type: AWS::DynamoDB::Table Properties: TableName: evs-grants AttributeDefinitions: - AttributeName: subject AttributeType: S KeySchema: - AttributeName: subject KeyType: HASH BillingMode: PAY_PER_REQUESTClient Usage
Section titled “Client Usage”Python
Section titled “Python”import requests
def get_evs_token(idp_token: str) -> str: """Exchange IdP token for EvidentSource token.""" response = requests.post( 'https://token-vending.example.com/exchange', json={'token': idp_token}, headers={'Content-Type': 'application/json'} ) response.raise_for_status() return response.json()['access_token']
# Use with EvidentSource SDKevs_token = get_evs_token(my_idp_token)client = await EvidentSource.connect( "api.example.com:50051", BearerTokenCredentials(evs_token))