Loading...
Loading...

Go Distributed Systems Tutorial

Distributed systems in Go leverage the language's excellent concurrency primitives and networking capabilities. This tutorial covers building scalable, fault-tolerant distributed applications from basic concepts to advanced patterns.

1. Distributed Systems Fundamentals

1.1 Key Concepts

  • Consensus: Agreement between nodes
  • Partition Tolerance: System continues despite network failures
  • Consistency Models: Strong vs eventual consistency

1.2 Basic RPC Service

Simple Go RPC server/client:

// server.go
type MathService struct{}

func (m *MathService) Add(args *Args, reply *int) error {
    *reply = args.A + args.B
    return nil
}

func startServer() {
    math := new(MathService)
    rpc.Register(math)
    rpc.HandleHTTP()
    
    listener, _ := net.Listen("tcp", ":1234")
    http.Serve(listener, nil)
}

// client.go
func callAdd(a, b int) int {
    client, _ := rpc.DialHTTP("tcp", "localhost:1234")
    args := &Args{A: a, B: b}
    var reply int
    client.Call("MathService.Add", args, &reply)
    return reply
}

2. Communication Patterns

2.1 HTTP Services

RESTful service with gorilla/mux:

func main() {
    r := mux.NewRouter()
    r.HandleFunc("/items", getItems).Methods("GET")
    r.HandleFunc("/items/{id}", getItem).Methods("GET")
    
    srv := &http.Server{
        Addr:    ":8000",
        Handler: r,
    }
    
    go func() {
        if err := srv.ListenAndServe(); err != nil {
            log.Printf("Server error: %v", err)
        }
    }()
    
    // Graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt)
    <-sigChan
    srv.Shutdown(context.Background())
}

2.2 gRPC Services

High-performance RPC with protobuf:

// service.proto
service UserService {
    rpc GetUser (UserRequest) returns (UserResponse);
}

// server.go
type server struct{}
func (s *server) GetUser(ctx context.Context, req *pb.UserRequest) (*pb.UserResponse, error) {
    return &pb.UserResponse{Id: req.Id, Name: "Alice"}, nil
}

func main() {
    lis, _ := net.Listen("tcp", ":50051")
    s := grpc.NewServer()
    pb.RegisterUserServiceServer(s, &server{})
    s.Serve(lis)
}

3. Distributed Coordination

3.1 Leader Election

Using etcd for leader election:

func runElection() {
    client, _ := clientv3.New(clientv3.Config{
        Endpoints: []string{"localhost:2379"},
    })
    
    session, _ := concurrency.NewSession(client)
    election := concurrency.NewElection(session, "/leader-election/")
    
    ctx := context.Background()
    if err := election.Campaign(ctx, "node1"); err != nil {
        log.Fatal(err)
    }
    
    // This node is now the leader
    log.Println("Elected as leader")
    
    // Keep leadership
    select {}
}

3.2 Distributed Locks

Redis-based distributed lock:

func acquireLock(rdb *redis.Client, lockName string) bool {
    result := rdb.SetNX(context.Background(), 
        lockName, 
        "locked", 
        10*time.Second).Val()
    return result
}

func releaseLock(rdb *redis.Client, lockName string) {
    rdb.Del(context.Background(), lockName)
}

func criticalSection() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    
    if acquireLock(rdb, "my-lock") {
        defer releaseLock(rdb, "my-lock")
        // Perform critical operations
    }
}

4. Data Distribution Patterns

4.1 Consistent Hashing

Implementing consistent hashing:

type Ring struct {
    nodes map[uint32]string
    sync.RWMutex
}

func (r *Ring) AddNode(node string) {
    r.Lock()
    defer r.Unlock()
    
    hash := crc32.ChecksumIEEE([]byte(node))
    r.nodes[hash] = node
}

func (r *Ring) GetNode(key string) string {
    r.RLock()
    defer r.RUnlock()
    
    hash := crc32.ChecksumIEEE([]byte(key))
    for nodeHash, node := range r.nodes {
        if hash <= nodeHash {
            return node
        }
    }
    // Wrap around to first node
    return r.nodes[0]
}

4.2 Sharding

Basic data sharding implementation:

type Shard struct {
    data map[string]interface{}
    sync.RWMutex
}

type ShardedMap struct {
    shards []*Shard
    count  int
}

func NewShardedMap(shardCount int) *ShardedMap {
    shards := make([]*Shard, shardCount)
    for i := 0; i < shardCount; i++ {
        shards[i] = &Shard{data: make(map[string]interface{})}
    }
    return &ShardedMap{shards: shards, count: shardCount}
}

func (m *ShardedMap) getShard(key string) *Shard {
    hash := fnv.New32()
    hash.Write([]byte(key))
    return m.shards[hash.Sum32()%uint32(m.count)]
}

5. Fault Tolerance Patterns

5.1 Circuit Breaker

Implementation using gobreaker:

var cb *gobreaker.CircuitBreaker

func init() {
    cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
        Name:    "API",
        Timeout: 5 * time.Second,
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures > 5
        },
    })
}

func CallAPI() (string, error) {
    result, err := cb.Execute(func() (interface{}, error) {
        resp, err := http.Get("https://api.example.com")
        if err != nil {
            return nil, err
        }
        defer resp.Body.Close()
        
        body, _ := ioutil.ReadAll(resp.Body)
        return string(body), nil
    })
    
    return result.(string), err
}

5.2 Retry Pattern

Exponential backoff retries:

func RetryOperation(ctx context.Context, op func() error, maxRetries int) error {
    backoff := time.Second
    for i := 0; i < maxRetries; i++ {
        err := op()
        if err == nil {
            return nil
        }
        
        select {
        case <-time.After(backoff):
            backoff *= 2
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return fmt.Errorf("max retries reached")
}

6. Event-Driven Architectures

6.1 Message Queues

Using NATS for pub/sub:

func publishEvents() {
    nc, _ := nats.Connect(nats.DefaultURL)
    defer nc.Close()
    
    for i := 0; i < 10; i++ {
        nc.Publish("updates", []byte(fmt.Sprintf("Message %d", i)))
    }
}

func subscribeEvents() {
    nc, _ := nats.Connect(nats.DefaultURL)
    defer nc.Close()
    
    nc.Subscribe("updates", func(m *nats.Msg) {
        fmt.Printf("Received: %s\n", string(m.Data))
    })
    
    select {} // Keep subscription alive
}

6.2 Event Sourcing

Basic event store implementation:

type Event struct {
    ID        string
    Type      string
    Data      []byte
    Timestamp time.Time
}

type EventStore struct {
    events []Event
    mu     sync.Mutex
}

func (es *EventStore) Append(event Event) {
    es.mu.Lock()
    defer es.mu.Unlock()
    es.events = append(es.events, event)
}

func (es *EventStore) Replay(handler func(Event)) {
    es.mu.Lock()
    defer es.mu.Unlock()
    
    for _, event := range es.events {
        handler(event)
    }
}

7. Distributed Systems Best Practices

7.1 Design Principles

  • Idempotency: Design operations to be safely retriable
  • Statelessness: Minimize node-local state when possible
  • Observability: Instrument all components with metrics/logs/tracing

7.2 Common Pitfalls

// ❌ Bad: No timeout in distributed calls
func callServiceBad(url string) ([]byte, error) {
    resp, err := http.Get(url) // No timeout
    // ...
}

// ✅ Good: Proper context timeout
func callServiceGood(ctx context.Context, url string) ([]byte, error) {
    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    client := &http.Client{Timeout: 5 * time.Second}
    resp, err := client.Do(req)
    // ...
}

// ❌ Bad: Assuming synchronous consistency
func updateBalanceBad(userID string, amount int) {
    db.Update(userID, amount) // Local DB only
    cache.Invalidate(userID) // Cache might not reflect immediately
}

// ✅ Good: Eventual consistency pattern
func updateBalanceGood(userID string, amount int) {
    event := CreateBalanceEvent(userID, amount)
    eventStore.Append(event) // Single source of truth
    bus.Publish(event)      // Subscribers update their views
}
0 Interaction
0 Views
Views
0 Likes
×
×
×
🍪 CookieConsent@Ptutorials:~

Welcome to Ptutorials

$ Allow cookies on this site ? (y/n)

top-home