Skip to main content

function

· 4 min read
Dipjyoti Metia
Lead Software Engineer

What is serverless

Serverless computing is a method of providing backend services on an as-used basis. A serverless provider allows users to write and deploy code without the hassle of worrying about the underlying infrastructure. code executes in a fully managed environment and no need to provision any infrastructure.

Introduction to cloud functions

Google Cloud Functions is a serverless execution environment for building and connecting cloud services. With Cloud Functions you write simple, single-purpose functions that are attached to events emitted from your cloud infrastructure and services. Your Cloud Function is triggered when an event being watched is fired. Your code executes in a fully managed environment. There is no need to provision any infrastructure or worry about managing any servers.

Functions Framework

The Functions Framework lets you write lightweight functions that run in many different environments. Functions framework

package main

import (
"github.com/GoogleCloudPlatform/functions-framework-go/funcframework"
p "github.com/cloudmock"
"golang.org/x/net/context"
"log"
"os"
)

func main() {
ctx := context.Background()
if err := funcframework.RegisterHTTPFunctionContext(ctx, "/", p.GoMock); err != nil {
log.Fatalf("funcframework.RegisterHTTPFunctionContext: %v\n", err)
}
port := "8080"
if envPort := os.Getenv("PORT"); envPort != "" {
port = envPort
}
if err := funcframework.Start(port); err != nil {
log.Fatalf("funcframework.Start: %v\n", err)
}
}

package db

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/cloudmock/config"
"github.com/cloudmock/secret"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

const ENV = "ENVIRONMENT"

func NewDatabaseConnection() *mongo.Collection {
var err error
log.Print("Connecting to mongodb")
conf, err := config.LoadConfigPath("config/app")
if err != nil {
log.Fatalf("")
}
env := os.Getenv(ENV)
var client *mongo.Client

conn, err := secret.GetSecrets()
if err != nil {
log.Fatalf("mongo db secret url failed %v", err)
}
if env == "dev" {
fmt.Println("Connecting to localdb")
client, err = mongo.NewClient(options.Client().SetAuth(
options.Credential{
Username: conf.DBuser,
Password: conf.DBpassword,
}).ApplyURI(conf.DBurl))
} else {
client, err = mongo.NewClient(options.Client().ApplyURI(conn))
}

if err != nil {
log.Fatalf("mongo db client failed %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = client.Connect(ctx)
if err != nil {
log.Fatalf("mongo db connection failed %s", err) //nolint:gocritic
}
return client.Database("function").Collection("payments")
}

package router

import (
"encoding/json"
"github.com/brianvoe/gofakeit/v6"
"net/http"
)

type UserDetails struct {
Name string `json:"name"`
Email string `json:"email"`
Phone string `json:"phone"`
Address string `json:"address"`
Company string `json:"company"`
JobTitle string `json:"jobTitle"`
}

func NewUserWrite() *[]UserDetails {
var usr []UserDetails
for i := 0; i < gofakeit.RandomInt([]int{5, 10, 12, 4, 11}); i++ {
usr = append(usr, UserDetails{
Name: gofakeit.Name(),
Email: gofakeit.Email(),
Phone: gofakeit.Phone(),
Address: gofakeit.Address().Address,
Company: gofakeit.Company(),
JobTitle: gofakeit.JobTitle(),
})
}
return &usr
}

func User() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
jData, err := json.Marshal(NewUserWrite())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(jData)
}
}

package p

import (
"github.com/cloudmock/router"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/httprate"
"github.com/rs/cors"
"net/http"
"time"
)

func GoMock(w http.ResponseWriter, r *http.Request) {
rc := chi.NewRouter()
conn := db.NewDatabaseConnection()

rc.Use(middleware.RealIP)
rc.Use(middleware.Logger)
rc.Use(httprate.Limit(
2,
1*time.Second,
httprate.WithLimitHandler(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "too many requests", http.StatusTooManyRequests)
}),
))

rc.Route("/api/v1", func(rc chi.Router) {
rc.Get("/users", router.User())
rc.Get("/categories", router.Category())
})

cors.Default().Handler(rc).ServeHTTP(w, r)
}

Deploy cloud function

name: Build and Deploy to CloudFunction

on:
push:
branches: [ main ]

jobs:
deploy:
name: deploy
runs-on: ubuntu-latest
steps:
- uses: google-github-actions/setup-gcloud@master
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
service_account_key: ${{ secrets.gcp_credentials }}
export_default_credentials: true
- uses: actions/checkout@v2
- name: Deploy serverless function
run: |
gcloud functions deploy "GoMock" \
--runtime go113 --trigger-http \
--allow-unauthenticated \
--region australia-southeast1 \
--update-env-vars MONGODB=${{ secrets.mongo_secret }} \
--max-instances 2 \
--memory 128mb \
--service-account=${{ secrets.service_account }} \
--no-user-output-enabled

Why Mocking using cloud function

Use cases of mocking using cloud function

System Testing

Performance testing

Performance tests check the behaviors of the system when it is under significant load. These tests are non-functional and can have the various form to understand the reliability, stability, and availability of the platform. For instance, it can be observing response times when executing a high number of requests, or seeing how the system behaves with a significant of data.

img.png img.png

Kafka

· 7 min read
Dipjyoti Metia
Lead Software Engineer

What is Apache Kafka?

Apache Kafka is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Behind the scenes, Kafka is distributed, scales well, replicates data across brokers (servers), can survive broker downtime, and much more. img.png

Topics, Partitions and Offsets

Topics: A particular stream of data

  • Similar to a table of the database
  • You can have as many topics you can
  • A topic is identified by its name

Topics are split in partitions

  • Each partition is ordered
  • Each message in partition will get an incremental ID called offset
  • Partition 0, 1, 2 ....
  • Order only guaranteed within a partition, not across partitions
  • Data is kept only for a limited time.
  • Once the data is written to a partition it cannot be changed.

Example Scenario : You can have multiple cabs, and each cabs reports its GPS location to kafka. You can have a topic cabs_gps that contains the position of all cabs. Each cab will send a message to kafka every 20 sec, each message will contain the cabID, and the cab location(lat/long)

Brokers & Topics

  • A kafka cluster is composed of multiple brokers(servers)
  • Each broker is identified by its ID(integer)
  • Each broker contains certain topic partitions
  • After connecting to any broker(called a bootstrap broker), you will be connected to the entire cluster
  • A good number to get start is 3 brokers, but some big clusters have more than 100 brokers

Example of topic A with 3 partitions Example of topic B with 2 partitions img.png

Topics replication

  • Topics should have a replication factor >1 (Usually between 2 and 3)

  • This way if one broker is down another broker can serve the data. Example of topic A with replication factor 2 img.png

  • At any time only ONE broker can be a leader for a given partition

  • Only that leader can receive and serve data for a partition.

  • The other broker will synchronize the data.

  • So each partition has one leader and multiple ISR (in-sync-replica) img.png

Producer

  • Producer write data to topics(which is made of partitions)
  • Producer automatically know to which broker and partition to write.
  • In case broker failure, Producers will automatically recover img.png
  • Producers can choose to receive acknowledgment of data writes.
    • acks=0 Producer won't wait for acknowledgment (Possible data loss)
    • acks=1 Producer will wait for leader acknowledgment (Limited data loss)
    • acks=2 Leader & Replica acknowledgment (no data loss)
  • Producer can choose to send a key with the message(string,num etc.)
  • If key==null data will sent round robin(broker 101 then 102 then 103)
  • If key is sent then all message for that key will send to same partition
  • A key is sent if we need a message ordering for a specific field as cabID.
producer.java
@Slf4j
public static void main(String[] args) {
String topic = "second-topic";
String value = "hello kafka";
String bootstrapServer = "127.0.0.1:9092";
// Create producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create the producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
log.info("Creating producer");
// Send Data
producer.send(record, (metadata, e) -> {
// Execute every time record is successfully send
if (e == null) {
log.info((metadata.timestamp()));
log.info(topic, metadata.topic());
log.info(metadata.hasOffset());
log.info(metadata.hasTimestamp());
} else {
e.printStackTrace();
}
});
producer.flush();
producer.close();
}

Consumer

  • Consumer read data from a topic(identified by name)
  • Consumer knows which broker to read from
  • In case of broker failure, consumer know how to recover
  • Data is read in order with in each partition img.png
  • Consumer read data in consumer groups
  • Each consumer within a group reads form exclusive partitions
  • If you have more consumers than partitions, some consumers will be inactive
  • Kafka stores the offset at which a consumer group has been reading
  • The offsets committed live in a kafka topic named _consumer_offsets
  • When a consumer in a group has processed the data received from kafka, it should be committing the offsets.
  • If a consumer dies, it will be able to read back from where it left off.
consumer.java
public static void main(String[] args) {

String bootstrapServer = "127.0.0.1:9092";
String groupId = "my-sixth-application";
String topic = "second-topic";

// Create consumer config
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

// subscribe consumer to our topic
consumer.subscribe(Arrays.asList(topic));

// poll for the new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}

Zookeeper

  • Zookeeper manager brokers(keeps a list of them)
  • Zookeeper helps in performing leader election for partition
  • Zookeeper send notifications to kafka in case of any changes.

Schema Registry

  • Kafka takes bytes as an input and publishes them
  • No data verification
  • Schema registry rejects bat data
  • A common data format must be agreed upon img.png
  • Apache avro as data format
    • Data is fully typed
    • Date is compressed automatically
    • Schema comes along with the data
    • Documentation is embedded in the schema
    • Data can be read across any language
    • Schema can be evolved over time in safe manner

Avro

Apache Avro is a data serialization system.

  • Avro provides:
    • Rich data structures.
    • A compact, fast, binary data format.
    • A container file, to store persistent data.
    • Remote procedure call (RPC).
    • Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.
{"namespace": "dip.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
  • Common Fields:
    • Name: Name of the schema
    • Namespace: (equivalent of package in java)
    • Doc: Documentation to explain your schema
    • Aliases: Optional other name for schema
    • Fields
      • Name: Name of field
      • Doc: Documentation for that field
      • Type: Data type for that field
      • Default: Default value for that field
    • Complex types:
      • Enums

        {
        "type": "enum",
        "name": "Customer Status",
        "symbols": ["BRONZE","SILVER","GOLD"]
        }
      • Arrays

        {
        "type": "array",
        "items": "string"
        }
      • Maps

        {
        "type": "map",
        "values": "string"
        }
      • Unions

        {
        "name": "middle_name",
        "type": [
        "null",
        "string"
        ],
        "default": "null"
        }
      • Calling other schema as type

Kafka Rest Proxy

  • kafka is great for java based consumers/producers
  • Avro support for some languages isn't great, where JSON/HTTP requests are great.
  • Reporting data to Kafka from any frontend app built in any language not supported by official Confluent clients
  • Ingesting messages into a stream processing framework that doesn’t yet support Kafka img.png
  • Perform a comprehensive set of administrative operations through REST APIs, including:
    • Describe, list, and configure brokers
    • Create, delete, describe, list, and configure topics
    • Delete, describe, and list consumer groups
    • Create, delete, describe, and list ACLs
    • List partition reassignments

Github Eyes

· 3 min read
Dipjyoti Metia
Lead Software Engineer

Presenting github eyes, a golang implementation of the github rest apis using Google GitHub sdk to interact with the Github Api, using github apis we can crawl over multiple repository and automate different tasks from creating repo, creating labels, adding milestones, get latest commits, updating workflows, get the project build status etc, below is the basic demonstration of getting list of issues from multiple repos.

image

The go-github library does not directly handle authentication. The easiest and recommended way to do this is using the OAuth2 library, If you have an OAuth2 access token (for example, a personal API token), you can use it with the OAuth2 library. To get the personal api token follow the documentation and Below is the code snippet for authentication using oauth2.

auth.go
package github

import (
"context"

"github.com/google/go-github/v33/github"
"golang.org/x/oauth2"
)

AUthenticating using github access token
// AuthGithubAPI authentication of github api
func AuthGithubAPI(ctx context.Context) *github.Client {
ts := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: "XXXXXXXXXXXXXXXXXXXXXXX"},
)
tc := oauth2.NewClient(ctx, ts)
return github.NewClient(tc)
}

Getting the list of issues in a repository, here we have created a struct named Issues with the required fields and then created a function ListIssues where we are passing the github api authentication and then client.Issues.ListByRepo is doing the job where underneath its calling Github Issues Api. We can also extend this function by adding filters to get open/closed issues and so on.

issues.go
package github

import (
"context"
"log"
"time"
)

type Issues struct {
ID int64
Title string
State string
CreatedAt time.Time
URL string
}

// ListIssues get list of issues
func ListIssues(repos string) interface{}{
ctx := context.Background()
client := AuthGithubAPI(ctx)
issues, _, err := client.Issues.ListByRepo(ctx, "dipjyotimetia", repos, nil)
if err != nil {
log.Println(err)
}

var issueList []interface{}
for _, v := range issues {
issueList = append(issueList,&Issues{
ID: v.GetID(),
Title: v.GetTitle(),
State: v.GetState(),
CreatedAt: v.GetCreatedAt(),
URL: v.GetHTMLURL(),
})
}
return issueList
}

Main function to drive the show, here we are passing the repo names in an array called repoNames and in a loop calling the the function derived above ListIssues and then generating the result in a json file in local path.

main.go
package main

import (
"encoding/json"
"github.com/goutils/pkg/github"
"io/ioutil"
)

func main() {
repoNames := []string{"HybridTestFramewrok", "MobileTestFramework"}
var result []interface{}
for _, repoName := range repoNames {
result = append(result, repoName, github.ListIssues(repoName))
}

file, _ := json.MarshalIndent(result, "", "")
_ = ioutil.WriteFile("test.json", file, 0644)
}

Example of the exported json data of the ListIssues function for the two repos.

[
"HybridTestFramewrok",
[
{
"ID": 690950907,
"Title": "Add reddis tests support",
"State": "open",
"CreatedAt": "2020-09-02T11:42:07Z",
"URL": "https://github.com/dipjyotimetia/HybridTestFramewrok/issues/65"
},
{
"ID": 690950833,
"Title": "Add ssh login builder",
"State": "open",
"CreatedAt": "2020-09-02T11:42:01Z",
"URL": "https://github.com/dipjyotimetia/HybridTestFramewrok/issues/64"
},
{
"ID": 690950781,
"Title": "Add file reader validations",
"State": "open",
"CreatedAt": "2020-09-02T11:41:55Z",
"URL": "https://github.com/dipjyotimetia/HybridTestFramewrok/issues/63"
},
{
"ID": 690950708,
"Title": "add kafka testing",
"State": "open",
"CreatedAt": "2020-09-02T11:41:48Z",
"URL": "https://github.com/dipjyotimetia/HybridTestFramewrok/issues/62"
},
{
"ID": 690950641,
"Title": "add rabitmq testing support",
"State": "open",
"CreatedAt": "2020-09-02T11:41:43Z",
"URL": "https://github.com/dipjyotimetia/HybridTestFramewrok/issues/61"
}
],
"MobileTestFramework",
[
{
"ID": 793821012,
"Title": "Add AWS Device Farm support",
"State": "open",
"CreatedAt": "2021-01-26T00:19:55Z",
"URL": "https://github.com/dipjyotimetia/MobileTestFramework/issues/88"
}
]
]

Project structure image

Serverless Framework

· One min read
Dipjyoti Metia
Lead Software Engineer

alt text

Where to start?

npm install -g serverless

alt text

  • Create IAM user

  • Setup user access
    serverless config credentials --provider aws --key xxxxxxxxxxxxxx --secret xxxxxxxxxxxxxx

  • Create project
    serverless create --template aws-nodejs --path my-service

  • Serverless yml

  • Serverless Deploy
    serverless deploy -v

alt text

Serverless offline

https://github.com/dherault/serverless-offline

serverless plugin install --name serverless-offline
serverless offline start

alt text

Insomnia

alt text

Serverless dashbird

Mongodb

alt text

$ npm init -y
$ npm i --save-dev serverless-offline
$ npm i --save mongoose dotenv
sls offline start --skipCacheInvalidation