Lädt...

🔧 Async events with AppSync Events API


Nachrichtenbereich: 🔧 Programmierung
🔗 Quelle: dev.to

Photo by Dawn McDonald on Unsplash

Simple setup for live events with AppSync Events

Scenario

Let's build a simple system, where Users can log in and request a report generation. Generating a report takes a while, and we don't want them to wait for it. We will let them know once the report is generated.

What we are building

Image description

To achieve this goal I will utilize AppSync Evetns API which is a fairly new service that allows implement WebSockets in an easy way.

Architecture

AppSync Events API contains two main pieces - channels for asynchronous communication, and HTTP endpoint to publish events to channels.

The user will use the web page to log into the system, and connect to the channel generated-repots/<user-id>. It is important that the user can see only its own reports.

To generate a new report user calls generate-report endpoint on the API Gateway. From there, the lambda function extracts user ID from the authorizer context and triggers StepFunction, which simulates the long-running task. Once the job is done, StepFunction publishes the information about it to the generated-repots/<user-id> channel.

Implementation

The code is available IN THIS REPO

Authentication

For auth I am using Cognito.

With AWS CDK, setting up the Cognito is quite straightforward. The only tricky part is that we need to define a default authenticated role and a default policy.

// lib/appsync_events-stack.ts

// ...
    const cognitoUserPool = new cdk.aws_cognito.UserPool(this, "EventsApp", {
      selfSignUpEnabled: true,
      signInAliases: { email: true },
      autoVerify: { email: true },
      passwordPolicy: {
        minLength: 8,
        requireLowercase: true,
        requireUppercase: true,
        requireDigits: true,
      },
    });

    const cognitoUserPoolClient = new cdk.aws_cognito.UserPoolClient(
      this,
      "EventsAppClient",
      {
        userPool: cognitoUserPool,
        generateSecret: false,
        authFlows: {
          adminUserPassword: true,
          userSrp: true,
        },
      }
    );

    const cognitoIdentity = new cdk.aws_cognito.CfnIdentityPool(
      this,
      "EventsAppIdentityProvider",
      {
        allowUnauthenticatedIdentities: false,
        cognitoIdentityProviders: [
          {
            clientId: cognitoUserPoolClient.userPoolClientId,
            providerName: cognitoUserPool.userPoolProviderName,
          },
        ],
      }
    );


    const authenticatedRole = new cdk.aws_iam.Role(
      this,
      "CognitoDefaultAuthenticatedRole",
      {
        assumedBy: new cdk.aws_iam.FederatedPrincipal(
          "cognito-identity.amazonaws.com",
          {
            StringEquals: {
              "cognito-identity.amazonaws.com:aud": cognitoIdentity.ref,
            },
            "ForAnyValue:StringLike": {
              "cognito-identity.amazonaws.com:amr": "authenticated",
            },
          },
          "sts:AssumeRoleWithWebIdentity"
        ),
      }
    );

    const defaultPolicy = new cdk.aws_cognito.CfnIdentityPoolRoleAttachment(
      this,
      "IdentityPoolRoleAttachment",
      {
        identityPoolId: cognitoIdentity.ref,
        roles: {
          authenticated: authenticatedRole.roleArn,
        },
      }
    );
//...

AppSync Events

For the moment I am writing this post, there are no constructs for AppSync Events for AWS CDK yet. This is not a blocker, as there is a CloudFormation support, so I can use L1 constructs inside CDK. In other words, I can write CloudFormation inside my typescript CDK stack definition.

The initial declaration contains only some information about auth providers:

// lib/appsync_events-stack.ts

//...
    const eventsAPI = new cdk.aws_appsync.CfnApi(this, "MyEventsAPI", {
      name: "MyEventsAPI",
      eventConfig: {
        authProviders: [
          {
            authType: "API_KEY",
          },
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
            cognitoConfig: {
              userPoolId: cognitoUserPool.userPoolId,
              awsRegion: "us-east-1",
            },
          },
        ],
        connectionAuthModes: [
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
          },
        ],
        defaultPublishAuthModes: [
          {
            authType: "API_KEY",
          },
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
          },
        ],
        defaultSubscribeAuthModes: [
          {
            authType: "AMAZON_COGNITO_USER_POOLS",
          },
        ],
      },
    });
// ...

I picked two flows. Cognito user pool is for users, and API key is for my StepFunction.

It is almost it. I need to define the channel namespace and API key

// lib/appsync_events-stack.ts

//...
    const generatedRequestNamespace = new cdk.aws_appsync.CfnChannelNamespace(
      this,
      "GeneratedRequestNamespace",
      {
        apiId: eventsAPI.attrApiId,
        name: "generated-reports",
      }
    );

    const eventsApiKey = new cdk.aws_appsync.CfnApiKey(this, "EventsApiKey", {
      apiId: eventsAPI.attrApiId,
    });
// ...

StepFunction

My state machine is fairly simple and contains two steps: waiting, and publishing the event.

To do the latter I will utilize Call HTTPs APIs action, which is the absolute killer feature for StepFunctions, as it allows integration with any API-based service within or outside AWS.

To be able to call the endpoint securely, I need to define a Connection (the same we would use for the API Destinations in the EventBridge)

// lib/appsync_events-stack.ts

// ...
    const eventsAPIKeySecret = new cdk.aws_secretsmanager.Secret(
      this,
      "EventsApiKeySecret"
    );

    const eventsConnection = new cdk.aws_events.Connection(
      this,
      "EventsAPIConnection",
      {
        authorization: cdk.aws_events.Authorization.apiKey(
          "x-api-key",
          cdk.SecretValue.secretsManager(eventsAPIKeySecret.secretName)
        ),
      }
    );
// ...

Here is where my code gets a bit clunky - I need to have Secret defined to be used for Connection, even though under the hood, the Connection creates its own secret in the SecretManager. I haven't figured out yet how to get rid of the spare custom Secret.

Here are the tasks defined for the state machine. For anything larger than a small demo I would use a separate .asl.json file to store the StepFunction definition. I almost always debug and update the StepFunction in the AWS console, and download the updated definition to keep it in the source control. This approach has its pros and cons, but this is a topic for another discussion.

In this example, tasks are defined in the CDK

// lib/appsync_events-stack.ts

//...
const waitTask = new cdk.aws_stepfunctions.Wait(this, "ComplexTask", {
      time: cdk.aws_stepfunctions.WaitTime.duration(cdk.Duration.seconds(30)),
    });

    const publishToAppsync = new cdk.aws_stepfunctions_tasks.HttpInvoke(
      this,
      "PublishToAppSync",
      {
        apiEndpoint: cdk.aws_stepfunctions.TaskInput.fromText("/event"),
        method: cdk.aws_stepfunctions.TaskInput.fromText("POST"),
        connection: eventsConnection,
        body: cdk.aws_stepfunctions.TaskInput.fromJsonPathAt("$.input"),
        apiRoot: "https://" + eventsAPI.getAtt("Dns.Http").toString(),
      }
    );
// ...

The StepFunction definition brings it all together

// lib/appsync_events-stack.ts

// ...
    const longTaskSfn = new cdk.aws_stepfunctions.StateMachine(
      this,
      "LongTaskStateMachine",
      {
        definition: waitTask.next(publishToAppsync),
        stateMachineType: cdk.aws_stepfunctions.StateMachineType.EXPRESS,
        logs: {
          destination: new cdk.aws_logs.LogGroup(
            this,
            "LongTaskStateMachineLogGroup"
          ),
          level: cdk.aws_stepfunctions.LogLevel.ALL,
          includeExecutionData: true,
        },
      }
    );
// ...

API Gateway

API HTTP Gateway will handle a single endpoint and use Cognito authorizer to authorize requests.

// lib/appsync_events-stack.ts

// ...
const httpApi = new cdk.aws_apigatewayv2.HttpApi(this, "HttpApi", {
      corsPreflight: {
        allowOrigins: ["*"],
        allowMethods: [cdk.aws_apigatewayv2.CorsHttpMethod.ANY],
        allowHeaders: ["*"],
      },
    });

    const cognitoAuthorizer =
      new cdk.aws_apigatewayv2_authorizers.HttpUserPoolAuthorizer(
        "CognitoAuthorizer",
        cognitoUserPool,
        {
          userPoolClients: [cognitoUserPoolClient],
        }
      );
// ...

I need a Lambda function to handle the request. Lambda needs permissions to invoke StepFunction

// lib/appsync_events-stack.ts

// ...
    const reportHandlerFunction = new rustLambda.RustFunction(this, "reportHandlerFunction", {
      entry: ("http_handler"),
      environment: {
        STEP_FUNCTION_ARN: longTaskSfn.stateMachineArn,
      },
    });

    longTaskSfn.grantStartExecution(reportHandlerFunction);

    httpApi.addRoutes({
      path: "/generate-report",
      methods: [HttpMethod.POST],
      integration:
        new cdk.aws_apigatewayv2_integrations.HttpLambdaIntegration(
          "reportHandlerIntegration",
          reportHandlerFunction
        ),
      authorizer: cognitoAuthorizer,
    });
// ...

Lambda Function - code

Once the infrastructure is defined, let's create a Lambda function to handle the requests.

// http_handler/src/main.rs
use aws_config::BehaviorVersion;
use lambda_http::{run, service_fn, tracing, Body, Error, Request, RequestExt, Response};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Message {
    message: String,
}

#[derive(Serialize, Deserialize)]
struct AppSyncEventsRequest {
    channel: String,
    events: Vec<String>,
}

#[derive(Serialize, Deserialize)]
struct StepFunctionInput {
    input: AppSyncEventsRequest,
}

async fn function_handler(
    sfn_client: &aws_sdk_sfn::Client,
    event: Request,
) -> Result<Response<Body>, Error> {

    let machine_arn = std::env::var("STEP_FUNCTION_ARN").unwrap();

    let context = event.request_context();

    let authorizer_data = context.authorizer().unwrap();

    let jwt_data = authorizer_data.jwt.clone().unwrap();

    let user_name = jwt_data.claims.get("username").unwrap();

    let message = Message {
        message: "report generated".to_string(),
    };

    let step_function_input = StepFunctionInput {
        input: AppSyncEventsRequest{
            channel: format!("generated-reports/{user_name}"),
            events: vec![serde_json::to_string(&message).unwrap()],
        }
    };

    let result = sfn_client
        .start_execution()
        .set_input(Some(serde_json::to_string(&step_function_input).unwrap()))
        .state_machine_arn(machine_arn)
        .send()
        .await
        .unwrap();

    let message = format!("{:?}", result);

    let resp = Response::builder()
        .status(200)
        .header("content-type", "text/html")
        .body(message.into())
        .map_err(Box::new)?;
    Ok(resp)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing::init_default_subscriber();

    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;

    let sfn_client = aws_sdk_sfn::Client::new(&config);

    run(service_fn(|ev| function_handler(&sfn_client, ev))).await
}

UI

I've created a new Vue3 application. I will use Amplify library which makes integration with Cognito and AppSync very easy. It can be used even if you are not planning to use Amplify service to deploy your frontend.

To configure Cognito and AppSync Events I add in the main.ts

// fe/src/main.ts
// ...
import { Amplify } from "aws-amplify";

Amplify.configure({
  API: {
    Events: {
      endpoint:
        "https://<your_endpoint>.appsync-api.us-east-1.amazonaws.com/event",
      region: "us-east-1",
      defaultAuthMode: "userPool",
    },
  },
  Auth: {
    Cognito: {
      userPoolId: "<user_pool>",
      userPoolClientId: "<client_id>",
      identityPoolId: "<identity_id>",
      loginWith: {
        email: true,
      },
      signUpVerificationMethod: "code",
      userAttributes: {
        email: {
          required: true,
        },
      },
      allowGuestAccess: true,
      passwordFormat: {
        minLength: 8,
        requireLowercase: true,
        requireUppercase: true,
        requireNumbers: true,
        requireSpecialCharacters: true,
      },
    },
  },
});
// ...

I wrap the whole application in the authenticator tag in the App.vue

// fe/src/App.vue
// ...
import { Authenticator } from '@aws-amplify/ui-vue'
import "@aws-amplify/ui-vue/styles.css";
</script>

<template>
  <authenticator>
    <div class="wrapper">
      <HelloWorld msg="You did it!" />
    </div>
  </authenticator>
</template>
// ...

To connect to the channel, it is enough to call events.connect

// fe/src/components/HelloWorld.vue
// ...
import { events } from 'aws-amplify/data'
const { user } = useAuthenticator();

onMounted(async () => {

  const channel = await events.connect(`/generated-reports/${user.username}`);

  channel.subscribe({
    next: (data) => {
      console.log('received', data.event);
      reports.value.push(data.event)
    },
    error: (err) => console.error('error', err),
  });

})
// ...

In the real world, you would move the connection part to a separate composable or keep it in the store, but for now, it is enough to keep it directly in the component.

Deploy

Once CDK is deployed, we need to make sure that our Connection has the right API key. Grab the key from the AppSync Events console

Image description

and update it in the Connections section in the EventBridge

Image description

Test

Start the frontend

cd fe
npm run dev

There is a default Amplify login form. Sign up for the service and log in.

After login, I can see my amazing UI, which contains from the user-id and a single button to generate report

Image description

I can check in the dev tools and see, that there is a WebSockets connection opened.

Image description

After clicking the button I need to wait for ~30 seconds, and I receive a message from the channel I subscribed for

Image description

It works 🎉 🎉

Summary

AppSync Events API allows the setup of a WebSocket connection with minimal configuration overwhelm. It lets publishing events via HTTPs endpoint, which any authorized service, including StepFunction can use.

Amplify library simplifies setting up the authorized connection from the UI to Events API.

...

🔧 Async events with AppSync Events API


📈 55.64 Punkte
🔧 Programmierung

🔧 Serverless RAG Chat with AppSync Events and Bedrock Knowledge Bases


📈 29.24 Punkte
🔧 Programmierung

🔧 Real-Time Pub/Sub with AWS AppSync Events: Introducing WebSocket Message Publishing


📈 29.24 Punkte
🔧 Programmierung

🔧 Simple ChatApp with AWS AppSync Events, CDK &amp; React


📈 29.24 Punkte
🔧 Programmierung

🔧 Serverless Chat on AWS with AppSync Events


📈 29.24 Punkte
🔧 Programmierung

🔧 AWS AppSync Events vs IoT Core


📈 29.24 Punkte
🔧 Programmierung

🔧 AWS AppSync Events — Serverless WebSockets Done Right or Just Different?


📈 29.24 Punkte
🔧 Programmierung

🔧 Mastering Async JavaScript: Promises vs. Async/Await


📈 26.61 Punkte
🔧 Programmierung

🔧 Mastering JavaScript Async Patterns: From Callbacks to Async/Await


📈 26.61 Punkte
🔧 Programmierung

🔧 Async Made Easy: A Deep Dive into JavaScript Callbacks, Promises, and Async/Await


📈 26.61 Punkte
🔧 Programmierung

🔧 Async Made Easy: A Deep Dive into JavaScript Callbacks, Promises, and Async/Await


📈 26.61 Punkte
🔧 Programmierung

🔧 Mastering Async/Await: Simplifying JavaScript's Async Operations


📈 26.61 Punkte
🔧 Programmierung

🔧 Is async/await a good idea? 🤔 async/await vs promises


📈 26.61 Punkte
🔧 Programmierung

🔧 Async… oh, wait (Introduction into Async/Await)


📈 26.61 Punkte
🔧 Programmierung

🕵️ Medium CVE-2020-28490: Async-git project Async-git


📈 26.61 Punkte
🕵️ Sicherheitslücken

🔧 Learn Async Programming in TypeScript: Promises, Async/Await, and Callbacks


📈 26.61 Punkte
🔧 Programmierung

🔧 Streamlined async operations for Vue. https://github.com/tenex-engineering/vue-async-operations


📈 26.61 Punkte
🔧 Programmierung

🔧 Effortless API Scaling: Unlock the Power of AWS AppSync


📈 26.32 Punkte
🔧 Programmierung

🔧 Quickly Set Up a GraphQL API for DynamoDB CRUD operations with Just a Few Clicks Using AWS AppSync


📈 26.32 Punkte
🔧 Programmierung

🔧 Access Amazon Bedrock with CLI, SDK, API Gateway, and AppSync


📈 26.32 Punkte
🔧 Programmierung

🕵️ CVE-2024-22464 | Dell EMC AppSync up to 4.6.0.0 log file (dsa-2024-072)


📈 21.23 Punkte
🕵️ Sicherheitslücken

🕵️ EMC AppSync Server bis 3.5.0.0 Default Credentials schwache Authentisierung


📈 21.23 Punkte
🕵️ Sicherheitslücken

🔧 IAM Authentication: Calling AppSync Mutations from Lambda (Step-by-Step-Guide)


📈 21.23 Punkte
🔧 Programmierung

🔧 Directory-Based Resolver Mapping for AWS AppSync


📈 21.23 Punkte
🔧 Programmierung

🕵️ CVE-2023-32458 | Dell AppSync up to 4.6.0.0 access control (dsa-2023-331)


📈 21.23 Punkte
🕵️ Sicherheitslücken

🕵️ CVE-2024-39586 | Dell AppSync up to 4.6.0.0 xml external entity reference (dsa-2024-420)


📈 21.23 Punkte
🕵️ Sicherheitslücken

💾 EMC AppSync Server Hardcoded Password


📈 21.23 Punkte
💾 IT Security Tools