Microservices best practice developing UsersService with Golang
Mon 27 Jan 2020

Microservice is a very popular architecture but I don't see many tutorials on internet. So let's me show you what we can do with Microservice. We don't need to do big things. It's like when we solve problems. We try to simplify problem as small as possible. After we saw the rules, we delegate computer to solve with big dataset. Same idea for this practice.

So why Users-service? Any application normally has User model as the top priority. Below instruction will help us have a first glance about Microservices in general and Golang in particular. Despite it's users-service but we will need to build many other Microservices to serves it. You would see them all at the end of this article.

1. UsersMicroservice

Tools:

  • Muxer: application handles and initial dependency.
  • Gorillar/mux: sync communication layer - HTTP/JSON.
  • PQ: communicate with PostgreSQL database.
  • Negroni: middleware manager.
  • SQLX: executor of queries.

Installation

go get github.com/gorilla/mux
go get github.com/lib/pq
go get github.com/codegangsta/negroni
go get github.com/jmoiron/sqlx

Project structure:

  • models.go
  • app.go
  • main.go

models.go

Declare the package and imports required. In model we need to execute query and encrypt password.

package main

import (
	"github.com/jmoiron/sqlx"
	"golang.org/x/crypto/bcrypt"
)

User struct replaced class in OOP language. We define type User with json key respect to db column. This is mapping.

type User struct {
	ID       int    `json:"id" db:"id"`
	Name     string `json:"name" db:"name"`
	Email    string `json:"email" db:"email"`
	Password string `json:"password" db:"password"`
}

Next, CRUD operations by sqlx to exec INSERT, UPDATE and SELECT queries.

  • CREATE
    func (u *User) create(db *sqlx.DB) error {
    	hashedPassword, err := bcrypt.GenerateFromPassword(
    		[]byte(u.Password),
    		bcrypt.DefaultCost,
    	)
    	if err != nil {
    		return err
    	}
    	return db.QueryRow("INSERT INTO users(name, email, password) VALUES($1, $2, $3) RETURNING id", u.Name, u.Email, string(hashedPassword)).Scan(&u.ID)
    }​
  • READ: DI to update user instance by data in db. Values in sql replaced same way in ruby.
    func (u *User) get(db *sqlx.DB) error {
    	return db.Get(u, "SELECT name, email FROM users WHERE id=$1",
    		u.ID)
    }​
  • UPDATE
    func (u *User) update(db *sqlx.DB) error {
    	hashedPassword, err := bcrypt.GenerateFromPassword(
    		[]byte(u.Password),
    		bcrypt.DefaultCost,
    	)
    	if err != nil {
    		return err
    	}
    	_, err = db.Exec("UPDATE users SET name=$1, email=$2, password=$3 WHERE id=$4", u.Name, u.Email, string(hashedPassword), u.ID)
    	return err
    }​
  • DELETE
    func (u *User) delete(db *sqlx.DB) error {
    	_, err := db.Exec("DELETE FROM users WHERE id=$1", u.ID)
    	return err
    }​

The key is sqlx library to execute query and using Dependency Injection to pass data. Now, get list:

func list(db *sqlx.DB, start, count int) ([]User, error) {
	users := []User{}
	err := db.Select(&users, "SELECT id, name, email FROM users LIMIT $1 OFFSET $2", count, start)
	if err != nil {
		return nil, err
	}
	return users, nil
}

Easy? Let's go to second step.

app.go

This file is for receiving data and sending it to data storage. It's similar above, we declare package and imports required. It's like "require" in ruby before implementing class.

package main

import (
	"database/sql"
	"encoding/json"
	"log"
	"net/http"
	"strconv"

	"github.com/codegangsta/negroni"
	"github.com/gorilla/mux"
	"github.com/jmoiron/sqlx"
	_ "github.com/lib/pq"
)
  • modeling. Go accepts uppercase.
    type App struct {
    	DB     *sqlx.DB
    	Router *mux.Router
    	Cache  Cache
    }​
  • initializing db connection. Inject "a" instance to function and grant values to a's attributes.
    func (a *App) Initialize(cache Cache, db *sqlx.DB) {
    	a.Cache = cache
    	a.DB = db
    	a.Router = mux.NewRouter()
    	a.initializeRoutes()
    }​
  • prepare routes using mux Router.
    func (a *App) initializeRoutes() {
    	a.Router.HandleFunc("/users", a.getUsers).Methods("GET")
    	a.Router.HandleFunc("/user", a.createUser).Methods("POST")
    	a.Router.HandleFunc("/user/{id:[0-9]+}",
    		a.getUser).Methods("GET")
    	a.Router.HandleFunc("/user/{id:[0-9]+}",
    		a.updateUser).Methods("PUT")
    	a.Router.HandleFunc("/user/{id:[0-9]+}",
    		a.deleteUser).Methods("DELETE")
    }​
  • initialize server
    func (a *App) Run(addr string) {
    	n := negroni.Classic()
    	n.UseHandler(a.Router)
    	log.Fatal(http.ListenAndServe(addr, n))
    }​
  • response types: HTTP (with http error code) & JSON
    func respondWithError(w http.ResponseWriter, code int, message string) {
    	respondWithJSON(w, code, map[string]string{"error": message})
    }
    
    func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
    	response, _ := json.Marshal(payload)
    
    	w.Header().Set("Content-Type", "application/json")
    	w.WriteHeader(code)
    	w.Write(response)
    }​
  • get single user. invalid - get from cache - not found return nil - cache 
    func (a *App) getUser(w http.ResponseWriter, r *http.Request) {
    	vars := mux.Vars(r)
    	id, err := strconv.Atoi(vars["id"])
    	if err != nil {
    		respondWithError(w, http.StatusBadRequest, "Invalid product ID")
    		return
    	}
    
    	if value, err := a.Cache.getValue(id); err == nil && len(value) != 0 {
    		w.Header().Set("Content-Type", "application/json")
    		w.WriteHeader(http.StatusOK)
    		w.Write([]byte(value))
    		return
    	}
    
    	user := User{ID: id}
    	if err := user.get(a.DB); err != nil {
    		switch err {
    		case sql.ErrNoRows:
    			respondWithError(w, http.StatusNotFound, "User not found")
    		default:
    			respondWithError(w, http.StatusInternalServerError, err.Error())
    		}
    		return
    	}
    
    	response, _ := json.Marshal(user)
    	if err := a.Cache.setValue(user.ID, response); err != nil {
    		respondWithError(w, http.StatusInternalServerError, err.Error())
    		return
    	}
    
    	w.Header().Set("Content-Type", "application/json")
    	w.WriteHeader(http.StatusOK)
    	w.Write(response)
    }​
  • get collection of users. 
    func (a *App) getUsers(w http.ResponseWriter, r *http.Request) {
    	count, _ := strconv.Atoi(r.FormValue("count"))
    	start, _ := strconv.Atoi(r.FormValue("start"))
    
    	if count > 10 || count < 1 {
    		count = 10
    	}
    	if start < 0 {
    		start = 0
    	}
    
    	users, err := list(a.DB, start, count)
    	if err != nil {
    		respondWithError(w, http.StatusInternalServerError, err.Error())
    		return
    	}
    
    	respondWithJSON(w, http.StatusOK, users)
    }​
  • create user. validate - get next id - cache id and json data in byte - enqueue id for creating process.
    func (a *App) createUser(w http.ResponseWriter, r *http.Request) {
    	var user User
    	decoder := json.NewDecoder(r.Body)
    	if err := decoder.Decode(&user); err != nil {
    		respondWithError(w, http.StatusBadRequest, "Invalid request payload")
    		return
    	}
    	defer r.Body.Close()
    
    	// get sequence from Postgres
    	a.DB.Get(&user.ID, "SELECT nextval('users_id_seq')")
    
    	JSONByte, _ := json.Marshal(user)
    	if err := a.Cache.setValue(user.ID, string(JSONByte)); err != nil {
    		respondWithError(w, http.StatusInternalServerError, err.Error())
    		return
    	}
    
    	if err := a.Cache.enqueueValue(createUsersQueue, user.ID); err != nil {
    		respondWithError(w, http.StatusInternalServerError, err.Error())
    		return
    	}
    
    	respondWithJSON(w, http.StatusCreated, user)
    }​
  • update user. 
    func (a *App) updateUser(w http.ResponseWriter, r *http.Request) {
    	vars := mux.Vars(r)
    	id, err := strconv.Atoi(vars["id"])
    	if err != nil {
    		respondWithError(w, http.StatusBadRequest, "Invalid user ID")
    		return
    	}
    
    	var user User
    	decoder := json.NewDecoder(r.Body)
    	if err := decoder.Decode(&user); err != nil {
    		respondWithError(w, http.StatusBadRequest, "Invalid resquest payload")
    		return
    	}
    	defer r.Body.Close()
    	user.ID = id
    
    	if err := user.update(a.DB); err != nil {
    		respondWithError(w, http.StatusInternalServerError, err.Error())
    		return
    	}
    
    	respondWithJSON(w, http.StatusOK, user)
    }​
  • delete user
    func (a *App) deleteUser(w http.ResponseWriter, r *http.Request) {
    	vars := mux.Vars(r)
    	id, err := strconv.Atoi(vars["id"])
    	if err != nil {
    		respondWithError(w, http.StatusBadRequest, "Invalid User ID")
    		return
    	}
    
    	user := User{ID: id}
    	if err := user.delete(a.DB); err != nil {
    		respondWithError(w, http.StatusInternalServerError, err.Error())
    		return
    	}
    
    	respondWithJSON(w, http.StatusOK, map[string]string{"result": "success"})
    }
    ​

It seems too much things in a file. But if we look at the bullets, we can see that they are clear, simple and objective code. Each function has its own SRP.

main.go

The name describes its functionality: running microservices itself.

  • prepare
    package main
    
    import (
    	"flag"
    	"fmt"
    	"log"
    	"os"
    
    	"github.com/jmoiron/sqlx"
    	_ "github.com/lib/pq"
    )
    
    const (
    	createUsersQueue = "CREATE_USER"
    	updateUsersQueue = "UPDATE_USER"
    	deleteUsersQueue = "DELETE_USER"
    )​
  • main
    func main() {
    	var numWorkers int
    	cache := Cache{Enable: true}
    	flag.StringVar(&cache.Address, "redis_address", os.Getenv("APP_RD_ADDRESS"), "Redis Address")
    	flag.StringVar(&cache.Auth, "redis_auth", os.Getenv("APP_RD_AUTH"), "Redis Auth")
    	flag.StringVar(&cache.DB, "redis_db_name", os.Getenv("APP_RD_DBNAME"), "Redis DB name")
    	flag.IntVar(&cache.MaxIdle, "redis_max_idle", 100, "Redis Max Idle")
    	flag.IntVar(&cache.MaxActive, "redis_max_active", 100, "Redis Max Active")
    	flag.IntVar(&cache.IdleTimeoutSecs, "redis_timeout", 60, "Redis timeout in seconds")
    	flag.IntVar(&numWorkers, "num_workers", 10, "Number of workers to consume queue")
    	cache.Pool = cache.NewCachePool()
    
    	connectionString := fmt.Sprintf(
    		"user=%s password=%s dbname=%s sslmode=disable",
    		os.Getenv("APP_DB_USERNAME"),
    		os.Getenv("APP_DB_PASSWORD"),
    		os.Getenv("APP_DB_NAME"),
    	)
    
    	db, err := sqlx.Open("postgres", connectionString)
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	go UsersToDB(numWorkers, db, cache, createUsersQueue)
    	go UsersToDB(numWorkers, db, cache, updateUsersQueue)
    	go UsersToDB(numWorkers, db, cache, deleteUsersQueue)
    
    	a := App{}
    	a.Initialize(cache, db)
    	a.Run(":8080")
    }
    ​

In Go, the whole application is initialized by executing a function main. The last line commands running application server on port 8080.

cache.go

go get github.com/garyburd/redigo/redis
  • declaration: address: redis address, auth: password to redis, db: redis bank, maxIdle: max conn.
    package main
    
    import (
    	"log"
    	"time"
    
    	redigo "github.com/garyburd/redigo/redis"
    )
    
    type Pool interface {
    	Get() redigo.Conn
    }
    
    type Cache struct {
    	Enable          bool
    	MaxIdle         int
    	MaxActive       int
    	IdleTimeoutSecs int
    	Address         string
    	Auth            string
    	DB              string
    	Pool            *redigo.Pool
    }​
  • create new connection pool
    // NewCachePool return a new instance of the redis pool
    func (cache *Cache) NewCachePool() *redigo.Pool {
    	if cache.Enable {
    		pool := &redigo.Pool{
    			MaxIdle:     cache.MaxIdle,
    			MaxActive:   cache.MaxActive,
    			IdleTimeout: time.Second * time.Duration(cache.IdleTimeoutSecs),
    			Dial: func() (redigo.Conn, error) {
    				c, err := redigo.Dial("tcp", cache.Address)
    				if err != nil {
    					return nil, err
    				}
    				if _, err = c.Do("AUTH", cache.Auth); err != nil {
    					c.Close()
    					return nil, err
    				}
    				if _, err = c.Do("SELECT", cache.DB); err != nil {
    					c.Close()
    					return nil, err
    				}
    				return c, err
    			},
    			TestOnBorrow: func(c redigo.Conn, t time.Time) error {
    				_, err := c.Do("PING")
    				return err
    			},
    		}
    		c := pool.Get() // Test connection during init
    		if _, err := c.Do("PING"); err != nil {
    			log.Fatal("Cannot connect to Redis: ", err)
    		}
    		return pool
    	}
    	return nil
    }​
  • redis get/set
    func (cache *Cache) getValue(key interface{}) (string, error) {
    	if cache.Enable {
    		conn := cache.Pool.Get()
    		defer conn.Close()
    		value, err := redigo.String(conn.Do("GET", key))
    		return value, err
    	}
    	return "", nil
    }
    
    func (cache *Cache) setValue(key interface{}, value interface{}) error {
    	if cache.Enable {
    		conn := cache.Pool.Get()
    		defer conn.Close()
    		_, err := redigo.String(conn.Do("SET", key, value))
    		return err
    	}
    	return nil
    }​

Caching as the first level of database. All requests must go go cache first and enqueue at same time to deal with real database.

func (cache *Cache) enqueueValue(queue string, uuid int) error {
	if cache.Enable {
		conn := cache.Pool.Get()
		defer conn.Close()
		_, err := conn.Do("RPUSH", queue, uuid)
		return err
	}
	return nil
}​

Scenarios:

  • POST/PUT/DELETE:
    • a request is registered in two places (cache the whole query & queue).
    • workers proceed message queue by check data in cache first to identify. (1)
    • performing action in real database if (1) is valid.
  • GET
    • fetch data in cache.
    • Not found? going to database to search.
    • Found? cache result.

Why do we have to put request in cache first for POST/PUT/DELETE? This is the magic. A posted data will be ready instantly even it has not been persisted in database. This is called "caching first strategy". Read this challenges for more details.

workers.go

There are two parts for processing data:

  1. Receiving data and cache.
  2. Processing data in queue: CRUD will be performed in here.

Now let's see what we need to do in this file.

  • imports
    package main
    
    import (
    	"encoding/json"
    	redigo "github.com/garyburd/redigo/redis"
    	"github.com/jmoiron/sqlx"
    	"log"
    	"sync"
    )
    
    type Worker struct {
    	cache Cache
    	db    *sqlx.DB
    	id    int
    	queue string
    }​
  • initialize worker
    func newWorker(id int, db *sqlx.DB, cache Cache,
    	queue string) Worker {
    	return Worker{cache: cache, db: db, id: id, queue: queue}
    }​
  • process worker: run and send data to database. If failed, it sends back data to queue. This is also the worker . WHILE loop in Go is FOR. This FOR without condition to break. This thread requests w.cache continuously.
    func (w Worker) process(id int) {
    	for {
    		conn := w.cache.Pool.Get()
    		var channel string
    		var uuid int
    		if reply, err := redigo.Values(conn.Do("BLPOP", w.queue,
    			30+id)); err == nil {
    
    			if _, err := redigo.Scan(reply, &channel, &uuid); err != nil {
    				w.cache.enqueueValue(w.queue, uuid)
    				continue
    			}
    
    			values, err := redigo.String(conn.Do("GET", uuid))
    			if err != nil {
    				w.cache.enqueueValue(w.queue, uuid)
    				continue
    			}
    
    			user := User{}
    			if err := json.Unmarshal([]byte(values), &user); err != nil {
    				w.cache.enqueueValue(w.queue, uuid)
    				continue
    			}
    
    			log.Println(user)
    			if err := user.create(w.db); err != nil {
    				w.cache.enqueueValue(w.queue, uuid)
    				continue
    			}
    
    		} else if err != redigo.ErrNil {
    			log.Fatal(err)
    		}
    		conn.Close()
    	}
    }​
  • create workers to consume the queues. wg.Wait() will block this goroutines until numWorkers created (threads in infinite FOR loop), the counter will go to Zero. So this goroutines ended but workers are still working.
    func UsersToDB(numWorkers int, db *sqlx.DB, cache Cache,
    	queue string) {
    	var wg sync.WaitGroup
    	for i := 0; i < numWorkers; i++ {
    		wg.Add(1)
    		go func(id int, db *sqlx.DB, cache Cache, queue string) {
    			worker := newWorker(i, db, cache, queue)
    			worker.process(i)
    			defer wg.Done()
    		}(i, db, cache, queue)
    	}
    	wg.Wait()
    }​

This Microservice was done simply. Next challenge is using docker to contain our service. 

2. Docker

Dockerfile for app

This file assembles the image of our container - gathering together in one place, to tell Docker know the necessary information of this system.

  • indicate OS
    FROM golang:latest​
  • declare app name
    LABEL Name=userservice Version=0.0.1​
  • create workspace
    RUN mkdir -p /go/src \ 
      && mkdir -p /go/bin \
      && mkdir -p /go/pkg​
  • app path and system path
    # Path of application for building process.
    ENV GOPATH=/go
    # Associate with system PATH of "container"
    ENV PATH=$GOPATH/bin:$PATH​
  • app directory
    RUN mkdir -p $GOPATH/src/app​
  • run main file and server on port 8080 (endpoint)
    # Indicate the container location on this application within the os of 
    # container.
    ADD . $GOPATH/src/app
    
    # Define working directory
    WORKDIR $GOPATH/src/app
    
    # Run the build to create main and run main by CMD.
    RUN go build -o main .
    CMD ["/go/src/app/main"]
    
    # Port for accessing application endpoints.
    EXPOSE 3000​

Dockerfile for PostgreSQL

  • define repository to download postgresql
    FROM postgres​
  • run sql when initializing docker
    # Run create.sql on init
    ADD create.sql /docker-entrypoint-initdb.d​

create.sql

  • create databases
    CREATE DATABASE users_prod;
    CREATE DATABASE users_dev;
    CREATE DATABASE users_test;​

docker-compose.yml

This file is for orchestration - tool for defining and running multiple containers Docker applications. Using this file to run the whole applications.

  • define version and services.
    version: '2.1'
    services:​
  • redis service.
    services:
     redis:
      container_name: redis
      image: redis
      ports:
       - "6379:6379"
      healthcheck:
       test: exit 0​
  • users-service-db (declared in services tag). environment is like bashrc, build info will look for Dockerfile.
    users-service-db:
      container_name: users-service-db
      build: ./db
      ports:
       - 5435:5432 # expose ports - HOST:CONTAINER
      environment: 
       - POSTGRES_USER:postgres
       - POSTGRES_PASSWORD=postgres
      healthcheck:
       test: exit 0​
  • usersservice.
    userservice:
        container_name: users-service
        image: userservice
        build: ./UsersService
        environment:
          - APP_RD_ADDRESS=redis:6379
          - APP_RD_AUTH=password
          - APP_RD_DBNAME=0
          - APP_SETTINGS=project.config.DevelopmentConfig
          - DATABASE_URL=postgres://postgres:postgres@users-service-db:5432/users_prod?sslmode=disable
          - DATABASE_DEV_URL=postgres://postgres:postgres@users-service-db:5432/users_dev?sslmode=disable
          - DATABASE_TEST_URL=postgres://postgres:postgres@users-service-db:5432/users_test?sslmode=disable
        depends_on:
          users-service-db:
            condition: service_healthy
          redis:
            condition: service_healthy
        links:
          - users-service-db
          - redis

3. Using the containers

  • we want to run docker as service so let's create host by docker-machine.
    docker-machine create dev​
  • map host URL generated.
    eval "$(docker-machine env dev)"​
  • create and initialize containers (main command to start everything)
    docker-compose up -d --build
  • shutdown redis on your MAC if encountering issue.
    redis-cli shutdown​
  • some useful command.
    # list images
    docker images -a
    
    docker system prune
    
    # remove image
    docker rmi nginx
    
    # remove all images
    docker rmi $(docker images -a -q)
    
    docker ps -a
    
    docker stop $(docker ps -a -q)
    docker rm $(docker ps -a -q)
    
    docker volume ls
    
    docker rm -v container_name​

4. Nginx as load balancer

  • dockerfile.
    FROM nginx
    COPY nginx.conf /etc/nginx/nginx.conf​
  • nginx.conf . It depends on how many users-service servers and scaling types which I introduced in http://www.guruonrails.com/the-microservice-introduction-comparing-with-monolithic-applications-and-tools . We set suitable upstream configuration. For now, we have only one users-service server. Note that users-service is container name.
    worker_processes 4;
     
    events { worker_connections 1024; }
     
    http {
        sendfile on;
     
        upstream user_servers {
            server users-service:3000;
        }
     
        server {
            listen 80;
     
            location / {
                proxy_pass http://user_servers;
                proxy_redirect off;
                proxy_set_header Host $host;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
                proxy_set_header X-Forwarded-Host $server_name;
            }
        }
    }​

5. Using Makefile

Using makefile we gather all necessary commands into one. Email me if you want to know this file's content (lol).

6. Test

  • POST.
    curl -X POST -d "{\"name\":\"Will\"}" http://localhost:80/user​
  • GET.
    curl -X GET http://localhost:80/users​
➜  guruonrails git:(master) ✗ curl -X POST -d "{\"name\":\"Will\"}" http://localhost:80/user   
{"id":1,"name":"Will","email":"","password":""}%                                                                                                                


➜  guruonrails git:(master) ✗ curl -X GET http://localhost:80/users 
[{"id":2,"name":"Will","email":"","password":""}]%                                                                                                              

7. Architecture

Caching first

 

Containers


8. Scaling

Reading this article for more information: http://www.guruonrails.com/the-microservice-introduction-comparing-with-monolithic-applications-and-tools

We scale by x-axis. It means cloning users-service. Let's have a look.

  • clone users-service by adding to docker-compose.yml. Same service source but different service name. We just copy users-service, paste and change its name.
    userservice2:
        image: userservice2
        container_name: users-service2
        build: ./UsersService
        environment:
          - APP_RD_ADDRESS=redis:6379
          - APP_RD_AUTH=password
          - APP_RD_DBNAME=0
          - APP_SETTINGS=project.config.DevelopmentConfig
          - DATABASE_URL=postgres://postgres:postgres@users-service-db:5432/users_prod?sslmode=disable
          - DATABASE_DEV_URL=postgres://postgres:postgres@users-service-db:5432/users_dev?sslmode=disable
          - DATABASE_TEST_URL=postgres://postgres:postgres@users-service-db:5432/users_test?sslmode=disable
        depends_on:
          users-service-db:
            condition: service_healthy
          redis:
            condition: service_healthy
        links:
          - users-service-db
          - redis​
  • nginx.conf Adding upstream.
    upstream user_servers {
       server users-service:3000;
       server users-service2:3000;
    }​

That's it. It's easy for scaling. Same source code deployed to different containers and using load balancer to redirect requests. Awesome! 

This article introduces a few methods to configure nginx as load balancers.

Load balancing with nginx uses a round-robin algorithm by default if no other method is defined, like example above. With round-robin scheme each server is selected in turns according to the order you set them in file. This balances the number of requests equally for short operations.

In this practice, we built many Microservices, not only users-service:

  1. userservice_loadbalance
  2. users-service2
  3. users-service
  4. redis 
  5. users-service-db

Next article I will show you how to build another Microservices to interact with users-service.

Thanks for reading!