Elixir Phoenix + Kafka Integration — Message Broadcasting Application

This article will help Elixir beginners in learning Elixir and setting up a basic Phoenix framework-based application that will integrate with Kafka and broadcast the messages to all the browser channels accessed by the user.

Why Elixir?

  • Scalability
  • Functionality
  • Concurrency
  • Interactivity
  • Fault tolerance

I found that Elixir is a good choice for highly scalable microservices applications, for any situation where performance and scalability are at a premium, including web applications and IoT development projects. Elixir is a powerful and dynamic programming language that builds on the strength of predecessors such as Ruby and Erlang. (Just my thoughts.)

This article purely for learning purpose, before going forward here is a few pre-requisites :

  • Some knowledge of Asdf
  • Some ideas about Elixir, Phoenix, Mix & Erlang

Let's starts with building the NewsFeed application and for reference do check out this git repository.

Run the below command to install the required tools

$ cd newsfeed/
$ asdf install
elixir 1.12.0-otp-24 is already installed
erlang 24.0.1 is already installed
nodejs 16.2.0 is already installed

To me, it is showing it is already installed, but in your case, it will download the binaries and set up all three tools defined in [<directory>/.tools.version]

Let’s understand Mix first

Mix — is a build tool that ships with Elixir that provides tasks for creating, compiling, testing your application, managing its dependencies, and much more;

Now run the below commands to set up a mix

$ mix local.hex 
$ mix local.rebar
01:27:56.934 [warn] Description: 'Authenticity is not established by certificate path validation'
Reason: 'Option {verify, verify_peer} and cacertfile/cacerts is missing'
01:27:57.276 [warn] Description: 'Authenticity is not established by certificate path validation'
Reason: 'Option {verify, verify_peer} and cacertfile/cacerts is missing'
01:27:57.339 [warn] Description: 'Authenticity is not established by certificate path validation'
Reason: 'Option {verify, verify_peer} and cacertfile/cacerts is missing'
* creating /Users/<HOME>/.asdf/installs/elixir/1.12.0-otp-24/.mix/rebar01:27:57.419 [warn] Description: 'Authenticity is not established by certificate path validation'
Reason: 'Option {verify, verify_peer} and cacertfile/cacerts is missing'
01:27:57.451 [warn] Description: 'Authenticity is not established by certificate path validation'
Reason: 'Option {verify, verify_peer} and cacertfile/cacerts is missing'
01:27:57.479 [warn] Description: 'Authenticity is not established by certificate path validation'
Reason: 'Option {verify, verify_peer} and cacertfile/cacerts is missing'
* creating /Users/<HOME>/.asdf/installs/elixir/1.12.0-otp-24/.mix/rebar3

What is Phoenix?

Phoenix is a web development framework written in Elixir which implements the server-side Model View Controller pattern.

Phoenix provides the best of both worlds — high developer productivity and high application performance. It also has some interesting new twists like channels for implementing real-time features and pre-compiled templates for blazing speed.

Install Phoenix

$ mix archive.install hex phx_new 1.5.9
Resolving Hex dependencies...
Dependency resolution completed:
New:
phx_new 1.5.9
* Getting phx_new (Hex package)
All dependencies are up to date
Compiling 10 files (.ex)
Generated phx_new app
Generated archive "phx_new-1.5.9.ez" with MIX_ENV=prod
Are you sure you want to install "phx_new-1.5.9.ez"? [Yn] Y
* creating /Users/<HOME>/.asdf/installs/elixir/1.12.0-otp-24/.mix/archives/phx_new-1.5.9

Create Phoenix Project

$ mix phx.new newsfeed
* creating newsfeed/config/config.exs
* creating newsfeed/config/dev.exs
* creating newsfeed/config/prod.exs
* creating newsfeed/config/prod.secret.exs
* creating newsfeed/config/test.exs
* creating newsfeed/lib/newsfeed/application.ex
* creating newsfeed/lib/newsfeed.ex
* creating newsfeed/lib/newsfeed_web/channels/user_socket.ex
* creating newsfeed/lib/newsfeed_web/views/error_helpers.ex
* creating newsfeed/lib/newsfeed_web/views/error_view.ex
* creating newsfeed/lib/newsfeed_web/endpoint.ex
* creating newsfeed/lib/newsfeed_web/router.ex
* creating newsfeed/lib/newsfeed_web/telemetry.ex
* creating newsfeed/lib/newsfeed_web.ex
* creating newsfeed/mix.exs
* creating newsfeed/README.md
* creating newsfeed/.formatter.exs
* creating newsfeed/.gitignore
* creating newsfeed/test/support/channel_case.ex
* creating newsfeed/test/support/conn_case.ex
* creating newsfeed/test/test_helper.exs
* creating newsfeed/test/newsfeed_web/views/error_view_test.exs
* creating newsfeed/lib/newsfeed/repo.ex
* creating newsfeed/priv/repo/migrations/.formatter.exs
* creating newsfeed/priv/repo/seeds.exs
* creating newsfeed/test/support/data_case.ex
* creating newsfeed/lib/newsfeed_web/controllers/page_controller.ex
* creating newsfeed/lib/newsfeed_web/templates/layout/app.html.eex
* creating newsfeed/lib/newsfeed_web/templates/page/index.html.eex
* creating newsfeed/lib/newsfeed_web/views/layout_view.ex
* creating newsfeed/lib/newsfeed_web/views/page_view.ex
* creating newsfeed/test/newsfeed_web/controllers/page_controller_test.exs
* creating newsfeed/test/newsfeed_web/views/layout_view_test.exs
* creating newsfeed/test/newsfeed_web/views/page_view_test.exs
* creating newsfeed/lib/newsfeed_web/gettext.ex
* creating newsfeed/priv/gettext/en/LC_MESSAGES/errors.po
* creating newsfeed/priv/gettext/errors.pot
* creating newsfeed/assets/webpack.config.js
* creating newsfeed/assets/.babelrc
* creating newsfeed/assets/js/app.js
* creating newsfeed/assets/css/app.scss
* creating newsfeed/assets/js/socket.js
* creating newsfeed/assets/package.json
* creating newsfeed/assets/static/favicon.ico
* creating newsfeed/assets/css/phoenix.css
* creating newsfeed/assets/static/images/phoenix.png
* creating newsfeed/assets/static/robots.txt
Fetch and install dependencies? [Yn] Y
* running mix deps.get
* running mix deps.compile
* running cd assets && npm install && node node_modules/webpack/bin/webpack.js --mode development
We are almost there! The following steps are missing: $ cd newsfeed
$ cd assets && npm install && node node_modules/webpack/bin/webpack.js --mode development
Then configure your database in config/dev.exs and run: $ mix ecto.createStart your Phoenix app with: $ mix phx.serverYou can also run your app inside IEx (Interactive Elixir) as: $ iex -S mix phx.server

We have created a Phoenix project, now let’s quickly start it. First, compile HTML files and other static resources

$ cd assets && npm install && node node_modules/webpack/bin/webpack.js --mode development
npm ERR! code 1
npm ERR! path /Users/<HOME>/Desktop/codebase/study/elixir-learning/Elixir-phoenix-kafka-integration/newsfeed/assets/node_modules/node-sass

If you are facing the above issue with npm install related to node-sass dependency, then you have to follow the below steps.

$ cd newsfeed/assets/
$ atom package.json // You could use any other code editor

Now run npm install again

cd assets && npm install && node node_modules/webpack/bin/webpack.js --mode development
.....
.....
added 812 packages, and audited 815 packages in 18s

Finally, an asset compilation is done

Now let's configure Postgres.

$ cd newsfeed/config/
$ atom dev.exs

Update username, password, and database with the one you have on your local

Set up the project and repositories (if any)

$ mix ecto.create
Compiling 14 files (.ex)
Generated newsfeed app
The database for Newsfeed.Repo has been created

What is Ecto?

Ecto provides a standardized API and a set of abstractions for talking to all the different kinds of databases so that Elixir developers can query whatever database they’re using by employing similar constructs.

Refer to this to get an idea about the Ecto library

Let’s start the Phoenix project

mix phx.server

Finally, You’ve successfully set up the Elixir Phoenix project

Let’s understand the directory structure root level

|-- README.md
|-- assets // will contain static HTML,CSS,JS
|-- config // Configuration,Profiles, DB properties
|-- deps
|-- lib
|-- mix.exs // Contains project & it's dependencies + application to run
|-- priv // Contains domain details, repos & models
`-- test // test cases
-----------------------------------------------------------------
|-- lib
| |-- newsfeed
| | |-- application.ex // News feed main application which start supervisor
| | |-- consumer
| | `-- repo.ex
| |-- newsfeed.ex
| |-- newsfeed_web // contains Web resources
| | |-- channels // WebSocket Channels
| | |-- controllers // Rest APIs controller
| | |-- endpoint.ex
| | |-- gettext.ex
| | |-- router.ex
| | |-- telemetry.ex
| | |-- templates // refer by Views to produce HTML contents
| | `-- views // Views of MVC pattern

Phoenix is based on MVC architecture. Let’s create MVC for our Newsfeed application

newsfeed/lib/newsfeed_web/controllers/newsfeed_controller.ex
newsfeed/lib/newsfeed_web/views/feed_view.ex
newsfeed/lib/newsfeed_web/templates/feed/index.html.eex

Let’s check application now “http: //localhost:4000/”

Let’s integrate with Kafka, there are two ways

  • Broadways — Standard way to integrate with any Pub/Sub
  • Kaffe — OOTB library to configure with Kafka only (Using this for a demo, Will cover Broadway in another article with Google Cloud and Kafka)

Add Kaffe dependencies

Edit newsfeed/mix.exs

Pull the dependency

$ mix deps.get
Resolving Hex dependencies...
Dependency resolution completed:
Unchanged:
connection 1.1.0
cowboy 2.9.0
cowboy_telemetry 0.3.1
cowlib 2.11.0
db_connection 2.4.0
decimal 2.0.0
ecto 3.6.2
ecto_sql 3.6.2
file_system 0.2.10
gettext 0.18.2
jason 1.2.2
mime 1.6.0
phoenix 1.5.9
phoenix_ecto 4.2.1
phoenix_html 2.14.3
phoenix_live_dashboard 0.4.0
phoenix_live_reload 1.3.1
phoenix_live_view 0.15.7
phoenix_pubsub 2.0.0
plug 1.11.1
plug_cowboy 2.5.0
plug_crypto 1.2.2
postgrex 0.15.9
ranch 1.8.0
telemetry 0.4.3
telemetry_metrics 0.6.0
telemetry_poller 0.5.1
New:
brod 3.15.6
crc32cer 0.1.4
kaffe 1.20.0
kafka_protocol 2.3.6
retry 0.14.1
snappyer 1.2.5
supervisor3 1.1.11
* Getting kaffe (Hex package)
* Getting brod (Hex package)
* Getting retry (Hex package)
* Getting kafka_protocol (Hex package)
* Getting supervisor3 (Hex package)
* Getting crc32cer (Hex package)
* Getting snappyer (Hex package)

Let’s run Kafka server, here is the quick way to set up Kafka. You only need docker for this

Create Kafka topic with name "newsfeed" 

In the Elixir application console, you will see below kinds of logs.

:supervisor: {:local, :brod_sup}
:started: [
pid: #PID<0.1150.0>,
id: :"learning-elixir",
mfargs: {:brod_client, :start_link,
[
[{'localhost', 9092}],
:"learning-elixir",
[
auto_start_producers: false,
allow_topic_auto_creation: false,
begin_offset: -1
]
]},
restart_type: {:permanent, 10},
shutdown: 5000,
child_type: :worker
]

This means our phoenix application can listen to Kafka at the 9092 port

Add Kafka Consumer and configure it.

Push some messages into Kafka for testing

$ docker run --rm --interactive ches/kafka kafka-console-producer.sh --topic newsfeed --broker-list 192.168.1.4:9092
Lack of burial space is changing age-old funeral practices, and in Japan ???tree burials' are gaining

Check Phoenix server console

%{
headers: [],
key: "",
offset: 3,
partition: 0,
topic: "newsfeed",
ts: 1623334135536,
ts_type: :create,
value: "Lack of burial space is changing age-old funeral practices, and in Japan ???tree burials' are gaining"
}
: Lack of burial space is changing age-old funeral practices, and in Japan ???tree burials' are gaining

Now let’s configure Socket — channels in our news feed application

Let’s understand the Channel first

Channels are an exciting part of Phoenix that enable soft real-time communication with and between millions of connected clients.

https://hexdocs.pm/phoenix/channels.html

Let’s create a NewsFeed channel to handle and broadcast news to all the connected devices.

Add an entry for news channel in [newsfeed/lib/newsfeed_web/channels/user_socket.ex]

channel "news:*", NewsfeedWeb.FeedChannel
$ atom newsfeed/assets/js/socket.js

Update newsfeed/assets/js/app.js

import "./socket"

Let’s update newsfeed/lib/newsfeed_web/templates/feed/index.html.eex

It will create a web socket with each browser with the phoenix server

Let’s push some message into Kafka topic

$ docker run --rm --interactive ches/kafka kafka-console-producer.sh --topic newsfeed --broker-list 192.168.1.5:9092Hello , Learning Phoenix Socket Channel

Now, see how Phoenix pushed the message to the opened “news:latest” WebSocket channel

Let's open two browsers and push some latest news into Kafka topic and check for the messages, you could see all Kafka messages.

Father || Coder || Engineer || Learner || Reader || Writer