Cookie Consent by Free Privacy Policy Generator Aktuallisiere deine Cookie Einstellungen ๐Ÿ“Œ Creating a simple Message Bus: Episode 1


๐Ÿ“š Creating a simple Message Bus: Episode 1


๐Ÿ’ก Newskategorie: Programmierung
๐Ÿ”— Quelle: dev.to

In my perpetual quest to be a better engineer and to understand tools/architectures better, I decided to start building stuff.

Build a message bus, database, reverse proxy... etc.
Whatever. Just build something I'm interested in learning more of.

To not think about this as a huge task, I decided to commit myself to build stuff in the simplest way possible. No fancy shenanigans.

Start small and simple. Add more along the way.

I'll be working with Go, not because I'm a Go expert, but because I like it and I feel like it helps with my productivity. I'll probably be learning more about Go along the way. Two birds with one stone kind of thing.

I also want to point out, that I write a post after I'm done with a portion of the code, sort of journaling my way through it.

which means that, the code could be incomplete or does not work (as I'm writing this, I'm thinking but that's what tests are for, but lets leave the tests for some other time). And it also means I'll probably be jumping between files a lot.

I wanted to start with a message bus.

Let's define it and start this series of posts by creating the project structure and maybe a bit more.

A message bus, is a messaging system that allows different systems to communicate with each other via sending and receiving of these messages.

So this message bus, is a system (also called the broker) that allows senders (also called producers) to send messages (just data, it could contain anything) to receivers (also called consumers).

In other words,

  1. A producer prepares a messages, points it to the broker and says "here, deliver this message please" to this destination
  2. The broker gets the message and delivers it to one or more consumers that are subscribing to said destination.

Message Bus
(image source: here)

Project layout

So we have three actors: a broker, consumer and producer.

Let's start by creating an empty project structure. I'll call the go module mbus. Short and nice.

# Create the dir and cd into it
mkdir mbus
cd mbus

# Create the go module
go mod init mbus

# Create the project layout
mkdir cmd internal build
mkdir cmd/{broker,producer,consumer}
mkdir internal/{broker,producer,consumer}

Our base project layout is created.

To make our lives easier, let's create a very simple Makefile

all: clean build

.PHONY: build
build:
    go build -o build/broker cmd/broker/broker.go
    go build -o build/producer cmd/producer/producer.go
    go build -o build/consumer cmd/consumer/consumer.go

.PHONY: clean
clean:
    rm -f build/broker
    rm -f build/consumer
    rm -f build/producer

So running make in the command line will rebuild our project. You could use something like gowatch, but again I'm keeping it simple.

Message structure

Let's define what "message" is in our application.

  1. It needs to have some data, it could be json, it could be Base64 encoded image... we don't know and we don't care.
  2. It needs to have some sort of destination name, for us to know where to send it to. In the "message bus" world, it's often called a "topic" or a "routing key" if you want to sound like a real nerd. I like "routing key" but let's use topic since it's shorter.

The message will be our contract between all parties, so let's call it apiv1 and put it inside internal, like so

mkdir internal/apiv1
touch internal/apiv1/message.go
// internal/apiv1/message.go

package apiv1

type Message struct {
    Data  []byte
    Len   int
    Topic string
}

func NewMessage(topic string, data []byte) *Message {
    return &Message{
        Data:  data,
        Len:   len(data),
        Topic: topic,
    }
}

Nice and simple.

The Len field is something we might not use, but when dealing with slices it's always a good idea to keep the length of it around. We'll see, if we don't need it we can just remove it later on.

Now, let's create the "producer" part of the app and call it a day.

Producing messages

If you remember from our intro, a producer is very simple: it has a message and a topic, and it just sends them off to a broker.

Knowing that, let's create a command line app that will accept a host and port pair to point it to the broker, a topic and a message.

// cmd/producer/producer.go
package main

import (
    "flag"
    "log"

    "mbus/internal/producer"
)

var (
    brokerHost string
    brokerPort string
    topic      string
    message    string
)

func main() {
    parseFlags()

    client := producer.NewClient(brokerHost, brokerPort)

    err := client.Publish(topic, message)
    if err != nil {
        log.Fatalf(err.Error())
    }
}

func parseFlags() {
    flag.StringVar(&brokerHost, "host", "127.0.0.1", "Broker host")
    flag.StringVar(&brokerPort, "port", "9990", "Broker port")
    flag.StringVar(&topic, "topic", "", "Topic to produce the message for")
    flag.StringVar(&message, "message", "", "The message contents")

    flag.Parse()

    if topic == "" {
        log.Fatalf("please provide a topic")
    }

    if message == "" {
        log.Fatalf("please provide a message to be sent")
    }
}
  1. Parsing flags to get command line arguments,
  2. Creating a client by using mbus/internal/producer package (which we'll create after this)
  3. Publishing the message to the topic, using the client.

The interesting stuff is at internal/producer/producer.go which we'll create in a minute, first I want to show you what a Producer looks like.

// internal/producer/producer.go
type Producer struct {
    host string
    port string

    conn net.Conn

    encoder encoder.Encoder
}
  • The first two fields are there to know where the broker is in our network.
  • The second field, represents the TCP connection to the broker.
  • The next one is the encoder. More on this bellow.

In order for us to send a Message object down the wire, we need to properly encode it to binary. We have a bunch of options in Go, but I'll go with msgpack. (Offical Website)

The encoder.Encoder is an interface so we can swap out the msgpack implementation with another one.

I'm used to a lot of OOP so that encoded is embedded inside the publisher (composition), but I realize that maybe that's not the best way to things all the time.

But it works for now, so let's leave it be.

// Creating a shared folder for all shared things
mkdir -p internal/shared/encoder

The Encoder interface is pretty simple:

// internal/shared/encoder/encoder.go

package encoder

import "mbus/internal/apiv1"

type Encoder interface {
    Encode(*apiv1.Message) ([]byte, error)
}

Let's create a msgpack encoder, but first let's install the msgpack package:

go get -u github.com/vmihailenco/msgpack
// internal/shared/encoder/msgpack.go
package encoder

import (
    "mbus/internal/apiv1"

    "github.com/vmihailenco/msgpack"
)

type MsgpackEncoder struct {
}

func (e *MsgpackEncoder) Encode(msg *apiv1.Message) ([]byte, error) {
    data, err := msgpack.Marshal(msg)
    if err != nil {
        return nil, err
    }

    return data, nil
}

Pretty simple stuff.

Now let's get back to our producer by creating a constructor method:

// internal/producer/producer.go

func New(host, port string) *Producer {
    return &Producer{
        host:    host,
        port:    port,
        conn:    nil,
        encoder: &encoder.MsgpackEncoder{},
    }
}

Here, we create a new Publisher and use MsgpackEncoder for decoding.

Now, let's add a method to the Publisher so we can start publishing messages:

// internal/producer/producer.go
func (c *Producer) Publish(topic, message string) error {
    err := c.connect()
    if err != nil {
        return err
    }

    msg := apiv1.NewMessage(topic, []byte(message))

    data, err := c.encoder.Encode(msg)
    if err != nil {
        return err
    }

    n, err := c.conn.Write(data)
    if err != nil {
        return err
    }

    if n != len(data) {
        return errors.New("could not write all data")
    }

    return nil
}

func (c *Producer) connect() error {
    conn, err := net.Dial("tcp", net.JoinHostPort(c.host, c.port))
    if err != nil {
        return nil
    }

    c.conn = conn
    return nil
}

Again very simple.

We connect to the broker, create a Message object, encode it, and send it to the broker using the connection established.

That's it. Producer part done. I told you the producer is the easiest one.

Next one will be the broker.

But first, let's at least manually test (since we don't have unit tests, lazy me) that our producer is actually sending stuff somewhere.

For that, we can use netcat for this. Run this command in another terminal:

nc -l -p 9990 -t 127.0.0.1

This will tell netcat (nc) to listen for TCP connections, on 127.0.0.1 port 9990. Kind of like a temporary test double for our broker ๐Ÿ˜

Now, let's compile our app and run the producer:

make
./build/producer -topic sales -message hey

You should see something printed on the terminal where you ran nc

Done, test coverage 100%.

Jokes aside, we'll probably add tests in another episode.

But for now, we'll call it a day.

Like I said at the start of this post, I'm just starting out with this so I don't really know where I'm going with this, and that's part of the fun. But it also means you could find the code not working or incomplete.

In any way, if you find a mistake or have some feedback, I'd love to hear it.

until then, see you in another episode!

...



๐Ÿ“Œ Creating a simple Message Bus: Episode 1


๐Ÿ“ˆ 44.79 Punkte

๐Ÿ“Œ Creating a simple Message Bus: Episode 2


๐Ÿ“ˆ 44.79 Punkte

๐Ÿ“Œ systemd 239 D-Bus Message bus-objects.c bus_process_object() denial of service


๐Ÿ“ˆ 31.27 Punkte

๐Ÿ“Œ systemd 240 Access Control shared/bus-util.c bus_open_system_watch_bind_with_description D-Bus Message privilege escalation


๐Ÿ“ˆ 31.27 Punkte

๐Ÿ“Œ Anybody have this issue of Failed to start D-bus System Message Bus See 'systemctl status dbus.service' for details.


๐Ÿ“ˆ 31.27 Punkte

๐Ÿ“Œ Simple using Phoenix.PubSub as a message-bus for Elixir cluster


๐Ÿ“ˆ 26.9 Punkte

๐Ÿ“Œ [utility] bkp - simple utility for creating simple backups


๐Ÿ“ˆ 24.12 Punkte

๐Ÿ“Œ [utility] bkp - simple utility for creating simple backups


๐Ÿ“ˆ 24.12 Punkte

๐Ÿ“Œ Autonomer Mercedes-Bus: So fühlt es sich an, wenn ein Bus führerlos ans Ziel kommt


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ Autonomer Mercedes-Bus: So fühlt es sich an, wenn ein Bus führerlos ans Ziel kommt


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ Medium CVE-2017-17604: Entrepreneur bus booking script project Entrepreneur bus booking script


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ Bus pass or bus ass? Hackers peeved about public transport claim to have reverse engineered ticket app for free rides


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ DEF CON 27 Aviation Village - Patrick Kiley - CAN Bus in Aviation Investigating CAN Bus in Avionics


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ GroรŸer TรœV Bus-Report: Jeder vierte Bus mit Mรคngeln - das sind die hรคufigsten Probleme


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ Medium CVE-2020-35378: Online bus ticket reservation project Online bus ticket reservation


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ MIT Team's School-Bus Algorithm Could Save $5M and 1M Bus Miles


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ Navigating the Rapid Digital Shift: Ticket on the Bus, Not the Whole Bus


๐Ÿ“ˆ 23.51 Punkte

๐Ÿ“Œ Version 14 Of The D-Bus Message Broker Released


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ Microsoft Windows Server 2008/Server 2012/Server 2012 R2 Service Bus AMQP Message denial of service


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ IBM Integration Bus/WebSphere Message Broker Permission File privilege escalation


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ fso-gsm/fso-frameworkd/fso-usaged on Debian D-Bus Security Policy *.conf Message memory corruption


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ IBM Integration Bus/WebSphere Message Broker Permission File erweiterte Rechte


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ CVE-2022-22978 | Oracle Communications Unified Assurance up to 5.5.9/6.0.1 Message Bus authorization


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ CVE-2022-42003 | Oracle Communications Unified Assurance up to 5.5.9/6.0.1 Message Bus denial of service


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ fso-gsm/fso-frameworkd/fso-usaged auf Debian D-Bus Security Policy *.conf Message Pufferรผberlauf


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ Flatpak up to 0.8.8/0.9.x/0.10.2 D-Bus Message flatpak-proxy.c Whitespace privilege escalation


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ AccountsService prior 0.6.55-0ubuntu13.2 on Ubuntu D-Bus Message privileges management


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ CVE-2022-2048 | Oracle Communications Unified Assurance up to 5.5.7.0.0/6.0.0.0.0 Message Bus denial of service


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ CVE-2019-17571 | Oracle Communications Unified Assurance up to 5.5.9/6.0.1 Message Bus Remote Code Execution


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ Flatpak bis 0.8.8/0.9.x/0.10.2 D-Bus Message flatpak-proxy.c Whitespace erweiterte Rechte


๐Ÿ“ˆ 19.51 Punkte

๐Ÿ“Œ Creating Self-Serve DevOps Without Creating More Toil


๐Ÿ“ˆ 18.68 Punkte

๐Ÿ“Œ Why in Typescript creating an enum doesn't need a =, but creating a new type does?


๐Ÿ“ˆ 18.68 Punkte

๐Ÿ“Œ Episode-008 - Creating an Index in MySQL Document Store


๐Ÿ“ˆ 17.88 Punkte

๐Ÿ“Œ Episode-042 - Creating JSON Objects in MySQL


๐Ÿ“ˆ 17.88 Punkte

๐Ÿ“Œ Episode-052 - Creating a MySQL HeatWave Instance in Oracle Cloud


๐Ÿ“ˆ 17.88 Punkte











matomo