Go Distributed Systems Tutorial
Distributed systems in Go leverage the language's excellent concurrency primitives and networking capabilities. This tutorial covers building scalable, fault-tolerant distributed applications from basic concepts to advanced patterns.
1. Distributed Systems Fundamentals
1.1 Key Concepts
- Consensus: Agreement between nodes
- Partition Tolerance: System continues despite network failures
- Consistency Models: Strong vs eventual consistency
1.2 Basic RPC Service
Simple Go RPC server/client:
// server.go
type MathService struct{}
func (m *MathService) Add(args *Args, reply *int) error {
*reply = args.A + args.B
return nil
}
func startServer() {
math := new(MathService)
rpc.Register(math)
rpc.HandleHTTP()
listener, _ := net.Listen("tcp", ":1234")
http.Serve(listener, nil)
}
// client.go
func callAdd(a, b int) int {
client, _ := rpc.DialHTTP("tcp", "localhost:1234")
args := &Args{A: a, B: b}
var reply int
client.Call("MathService.Add", args, &reply)
return reply
}
2. Communication Patterns
2.1 HTTP Services
RESTful service with gorilla/mux:
func main() {
r := mux.NewRouter()
r.HandleFunc("/items", getItems).Methods("GET")
r.HandleFunc("/items/{id}", getItem).Methods("GET")
srv := &http.Server{
Addr: ":8000",
Handler: r,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Printf("Server error: %v", err)
}
}()
// Graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
<-sigChan
srv.Shutdown(context.Background())
}
2.2 gRPC Services
High-performance RPC with protobuf:
// service.proto
service UserService {
rpc GetUser (UserRequest) returns (UserResponse);
}
// server.go
type server struct{}
func (s *server) GetUser(ctx context.Context, req *pb.UserRequest) (*pb.UserResponse, error) {
return &pb.UserResponse{Id: req.Id, Name: "Alice"}, nil
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &server{})
s.Serve(lis)
}
3. Distributed Coordination
3.1 Leader Election
Using etcd for leader election:
func runElection() {
client, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
})
session, _ := concurrency.NewSession(client)
election := concurrency.NewElection(session, "/leader-election/")
ctx := context.Background()
if err := election.Campaign(ctx, "node1"); err != nil {
log.Fatal(err)
}
// This node is now the leader
log.Println("Elected as leader")
// Keep leadership
select {}
}
3.2 Distributed Locks
Redis-based distributed lock:
func acquireLock(rdb *redis.Client, lockName string) bool {
result := rdb.SetNX(context.Background(),
lockName,
"locked",
10*time.Second).Val()
return result
}
func releaseLock(rdb *redis.Client, lockName string) {
rdb.Del(context.Background(), lockName)
}
func criticalSection() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
if acquireLock(rdb, "my-lock") {
defer releaseLock(rdb, "my-lock")
// Perform critical operations
}
}
4. Data Distribution Patterns
4.1 Consistent Hashing
Implementing consistent hashing:
type Ring struct {
nodes map[uint32]string
sync.RWMutex
}
func (r *Ring) AddNode(node string) {
r.Lock()
defer r.Unlock()
hash := crc32.ChecksumIEEE([]byte(node))
r.nodes[hash] = node
}
func (r *Ring) GetNode(key string) string {
r.RLock()
defer r.RUnlock()
hash := crc32.ChecksumIEEE([]byte(key))
for nodeHash, node := range r.nodes {
if hash <= nodeHash {
return node
}
}
// Wrap around to first node
return r.nodes[0]
}
4.2 Sharding
Basic data sharding implementation:
type Shard struct {
data map[string]interface{}
sync.RWMutex
}
type ShardedMap struct {
shards []*Shard
count int
}
func NewShardedMap(shardCount int) *ShardedMap {
shards := make([]*Shard, shardCount)
for i := 0; i < shardCount; i++ {
shards[i] = &Shard{data: make(map[string]interface{})}
}
return &ShardedMap{shards: shards, count: shardCount}
}
func (m *ShardedMap) getShard(key string) *Shard {
hash := fnv.New32()
hash.Write([]byte(key))
return m.shards[hash.Sum32()%uint32(m.count)]
}
5. Fault Tolerance Patterns
5.1 Circuit Breaker
Implementation using gobreaker:
var cb *gobreaker.CircuitBreaker
func init() {
cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "API",
Timeout: 5 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
}
func CallAPI() (string, error) {
result, err := cb.Execute(func() (interface{}, error) {
resp, err := http.Get("https://api.example.com")
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
return string(body), nil
})
return result.(string), err
}
5.2 Retry Pattern
Exponential backoff retries:
func RetryOperation(ctx context.Context, op func() error, maxRetries int) error {
backoff := time.Second
for i := 0; i < maxRetries; i++ {
err := op()
if err == nil {
return nil
}
select {
case <-time.After(backoff):
backoff *= 2
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("max retries reached")
}
6. Event-Driven Architectures
6.1 Message Queues
Using NATS for pub/sub:
func publishEvents() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
for i := 0; i < 10; i++ {
nc.Publish("updates", []byte(fmt.Sprintf("Message %d", i)))
}
}
func subscribeEvents() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
nc.Subscribe("updates", func(m *nats.Msg) {
fmt.Printf("Received: %s\n", string(m.Data))
})
select {} // Keep subscription alive
}
6.2 Event Sourcing
Basic event store implementation:
type Event struct {
ID string
Type string
Data []byte
Timestamp time.Time
}
type EventStore struct {
events []Event
mu sync.Mutex
}
func (es *EventStore) Append(event Event) {
es.mu.Lock()
defer es.mu.Unlock()
es.events = append(es.events, event)
}
func (es *EventStore) Replay(handler func(Event)) {
es.mu.Lock()
defer es.mu.Unlock()
for _, event := range es.events {
handler(event)
}
}
7. Distributed Systems Best Practices
7.1 Design Principles
- Idempotency: Design operations to be safely retriable
- Statelessness: Minimize node-local state when possible
- Observability: Instrument all components with metrics/logs/tracing
7.2 Common Pitfalls
// ❌ Bad: No timeout in distributed calls
func callServiceBad(url string) ([]byte, error) {
resp, err := http.Get(url) // No timeout
// ...
}
// ✅ Good: Proper context timeout
func callServiceGood(ctx context.Context, url string) ([]byte, error) {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
// ...
}
// ❌ Bad: Assuming synchronous consistency
func updateBalanceBad(userID string, amount int) {
db.Update(userID, amount) // Local DB only
cache.Invalidate(userID) // Cache might not reflect immediately
}
// ✅ Good: Eventual consistency pattern
func updateBalanceGood(userID string, amount int) {
event := CreateBalanceEvent(userID, amount)
eventStore.Append(event) // Single source of truth
bus.Publish(event) // Subscribers update their views
}
×