Notifications "Webhook" Service
Subject to change
Utilize NATS Streams for robust, real-time event notifications from our platform. This approach offers advantages over traditional webhooks for certain use cases.
VERTEX ENGINE
Table of Contents
Pros of NATS Streams vs. Webhooks
While webhooks are a common way to receive notifications, NATS Streams (powered by NATS JetStream) offer several advantages:
- Guaranteed Delivery: NATS JetStream provides at-least-once or exactly-once delivery semantics, ensuring you don't miss critical events even if your consumer is temporarily offline. Webhooks can be lost if the endpoint is unavailable.
- Persistence: Events are persisted in the stream until they are acknowledged by the consumer, allowing for replayability and recovery from failures.
- Scalability and Performance: NATS is designed for high-throughput, low-latency messaging, making it suitable for applications with a large volume of notifications.
- Pull-based Consumption: Consumers connect to the stream and pull messages, which can be simpler to manage in terms of network configuration (no need for publicly accessible webhook endpoints) and backpressure handling.
- Flexible Consumption Models: Support for multiple consumers, consumer groups, and various acknowledgment policies.
When to Choose NATS Streams
Consider NATS Streams when message reliability, replayability, and high performance are critical, or when managing inbound webhook endpoints is complex for your infrastructure.
Getting Started with NATS Streams
Our notification streams are built on NATS JetStream. You'll need a NATS client library for your programming language to connect and consume messages.
Go Consumer Example
Here's a basic example of how to create/update a consumer and start consuming messages in Go. Ensure you have the necessary NATS client and JetStream libraries.
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println("Error connecting to NATS:", err)
return
}
defer nc.Close()
// Create JetStream Context
js, err := jetstream.New(nc)
if err != nil {
fmt.Println("Error creating JetStream context:", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Example: Assume a stream named "NOTIFICATIONS" exists with subjects like "notifications.*"
// You would typically define your stream and its subjects server-side or via configuration.
// streamName := "NOTIFICATIONS"
// Consumer configuration (durable consumer)
consumerConfig := jetstream.ConsumerConfig{
Durable: "my-durable-consumer",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: "notifications.transaction.created.debit", // Example: consume specific subject
}
// Create or update the consumer
// Replace "YOUR_STREAM_NAME" with the actual stream name provided by our service
cons, err := js.CreateOrUpdateConsumer(ctx, "YOUR_STREAM_NAME", consumerConfig)
if err != nil {
fmt.Println("Error creating/updating consumer:", err)
return
}
wg := sync.WaitGroup{}
// Example: expecting 1 message for this specific filter. Adjust as needed.
wg.Add(1)
fmt.Printf("Listening on subject %s
", consumerConfig.FilterSubject)
// Start consuming messages
// The Consume method handles message fetching and acking in a background goroutine.
cctx, err := cons.Consume(func(msg jetstream.Msg) {
msg.Ack() // Acknowledge the message
fmt.Printf("Received message on subject %s: %s
", msg.Subject(), string(msg.Data()))
wg.Done() // Decrement counter
})
if err != nil {
fmt.Println("Error starting consumer:", err)
return
}
defer cctx.Stop() // Stop the consumer when done
wg.Wait() // Wait for messages to be processed (for this example)
fmt.Println("Finished processing messages.")
}
In the example above, replace YOUR_STREAM_NAME
with the actual name of the NATS stream provided by our service (e.g., SAVA_NOTIFICATIONS
). The FilterSubject
in ConsumerConfig
allows you to subscribe to specific event types.
Available Notification Streams
Notifications are published to specific subjects within a NATS stream. The general format for subjects is:
notifications:<entity>.<action>[:<details>]
Transaction Notifications
Receive updates related to financial transactions.
notifications:transaction.created.debit
- A new debit transaction is created.notifications:transaction.created.credit
- A new credit transaction is created.notifications:transaction.status.updated
- The status of an existing transaction has changed (e.g., pending, completed, failed).
Payloads will contain detailed information about the transaction.
Example Payload (notifications:transaction.created.debit)
{
"event_id": "evt_2a9f43ab-0c7c-4a89-9b0e-8a7a2e1a0f6d",
"event_type": "notifications:transaction.created.debit",
"timestamp": "2024-08-01T10:30:00Z",
"data": {
"transaction_id": "txn_1e7a4f8b-3c2d-4e9a-8f7g-6h5i4j3k2l1m",
"account_id": "acc_5b6c7d8e-1a2b-3c4d-5e6f-0g1h2i3j4k5l",
"amount": 5000, // in cents
"currency": "ZAR",
"description": "Payment for service XYZ",
"status": "completed",
"created_at": "2024-08-01T10:29:55Z",
"metadata": {
"order_id": "ord_c3b4a5d6-e7f8-9a0b-1c2d-3e4f5a6b7c8d",
"beneficiary_ref": "INV-12345"
}
}
}
Account Status Notifications
Get notified about changes to account statuses.
notifications:account.activated
- An account has been activated.notifications:account.disabled
- An account has been disabled (e.g., due to compliance, user request).
Payloads will include the account ID and the reason for the status change.
Example Payload (notifications:account.activated)
{
"event_id": "evt_8f0b1b0f-ffac-4e0c-805c-40dc141913f9",
"event_type": "notifications:account.activated",
"timestamp": "2024-08-01T11:00:00Z",
"data": {
"account_id": "acc_5b6c7d8e-1a2b-3c4d-5e6f-0g1h2i3j4k5l",
"status": "active",
"reason_code": "KYB_APPROVED",
"effective_date": "2024-08-01T10:55:00Z"
}
}
Account Balance Notifications
Real-time updates when an account balance changes.
notifications:account.balance.updated
- An account balance has been updated.
Payloads will include account ID, previous balance, new balance, change amount, and currency.
Example Payload (notifications:account.balance.updated)
{
"event_id": "evt_7aa53a6d-5869-c814-5cee-2af0f5d92aa5",
"event_type": "notifications:account.balance.updated",
"timestamp": "2024-08-01T11:15:00Z",
"data": {
"account_id": "acc_5b6c7d8e-1a2b-3c4d-5e6f-0g1h2i3j4k5l",
"previous_balance": 100500, // in cents
"current_balance": 95500, // in cents
"change_amount": -5000, // in cents
"currency": "ZAR",
"transaction_id": "txn_1e7a4f8b-3c2d-4e9a-8f7g-6h5i4j3k2l1m", // Associated transaction, if applicable
"balance_type": "available"
}
}