Sync package in Go

Sync package in Go

Learn about concurrency primitives such as WaitGroup, Mutex, Cond, Pool, Once, and Map

As we learned earlier, goroutines run in the same address space, so access to shared memory must be synchronized. The sync package provides useful primitives.

WaitGroup

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

Usage

We can use the sync.WaitGroup using the following methods:

  • Add(delta int) takes in an integer value which is essentially the number of goroutines that the waitgroup has to wait for. This must be called before we execute a goroutine.
  • Done() is called within the goroutine to signal that the goroutine has successfully executed.
  • Wait() blocks the program until all the goroutines specified by Add() have invoked Done() from within.

Example

Let's take a look at an example

package main

import (
    "fmt"
    "sync"
)

func work() {
    fmt.Println("working...")
}

func main() {
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        defer wg.Done()
        work()
    }()

    wg.Wait()
}

If we run this, we can see our program runs as expected

$ go run main.go
working...

We can also pass the waitgroup to the function directly.

func work(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("working...")
}

func main() {
    var wg sync.WaitGroup

    wg.Add(1)

    go work(&wg)

    wg.Wait()
}

But is important to know that a waitgroup must not be copied after first use. And if it's explicitly passed into functions, it should be done by a pointer. This is because it can affect our counter which will disrupt the logic of our program.

Let's also increase the number of goroutines and update our waitgroup Add method to wait for 4 goroutines.

func main() {
    var wg sync.WaitGroup

    wg.Add(4)

    go work(&wg)
    go work(&wg)
    go work(&wg)
    go work(&wg)

    wg.Wait()
}

And as expected, all our goroutines were executed.

$ go run main.go
working...
working...
working...
working...

Mutex

A Mutex is a mutual exclusion lock that prevents other processes from entering a critical section of data while a process occupies it to prevent race conditions from happening.

What's a critical section?

So, a critical section can be a piece of code that must not be run by multiple threads at once because the code contains shared resources.

Usage

We can use sync.Mutex using the following methods:

  • Lock() acquires or holds the lock.
  • Unlock() releases the lock.
  • TryLock() tries to lock and reports whether it succeeded.

Example

Let's take a look at an example, we will create a Counter struct and add an Update method which will update the internal value.

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int
}

func (c *Counter) Update(n int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Adding %d to %d\n", n, c.value)
    c.value += n
}

func main() {
    var wg sync.WaitGroup

    c := Counter{}

    wg.Add(4)

    go c.Update(10, &wg)
    go c.Update(-5, &wg)
    go c.Update(25, &wg)
    go c.Update(19, &wg)

    wg.Wait()
    fmt.Println(c.value)
}

Let's run this and see what happens.

$ go run main.go
Adding -5 to 0
Adding 10 to 0
Adding 19 to 0
Adding 25 to 0
Result is 49

That doesn't look accurate, seems like our value is always zero but we somehow got the correct answer.

Well, this is because, in our example, multiple goroutines are updating the value variable. And as you must have guessed, this is not ideal.

This is the perfect use case for Mutex. So, let's start by using sync.Mutex and wrap our critical section in between Lock() and Unlock() methods.

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    m     sync.Mutex
    value int
}

func (c *Counter) Update(n int, wg *sync.WaitGroup) {
    c.m.Lock()
    defer wg.Done()
    fmt.Printf("Adding %d to %d\n", n, c.value)
    c.value += n
    c.m.Unlock()
}

func main() {
    var wg sync.WaitGroup

    c := Counter{}

    wg.Add(4)

    go c.Update(10, &wg)
    go c.Update(-5, &wg)
    go c.Update(25, &wg)
    go c.Update(19, &wg)

    wg.Wait()
}
$ go run main.go
Adding -5 to 0
Adding 19 to -5
Adding 25 to 14
Adding 10 to 39
Result is 49

Looks like we solved our issue and the output looks correct as well.

Note: Similar to WaitGroup a Mutex must not be copied after first use.

RWMutex

An RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer.

In other words, readers don't have to wait for each other. They only have to wait for writers holding the lock.

sync.RWMutex is thus preferable for data that is mostly read, and the resource that is saved compared to a sync.Mutex is time.

Usage

Similar to sync.Mutex, we can use sync.RWMutex using the following methods:

  • Lock() acquires or holds the lock.
  • Unlock() releases the lock.
  • RLock() acquires or holds the read lock.
  • RUnlock() releases the read lock.

Notice how RWMutex has additional RLock and RUnlock methods compared to Mutex.

Example

Let's add a GetValue method which will read the counter value. We will also change sync.Mutex to sync.RWMutex

Now, we can simply use the RLock and RUnlock methods so that readers don't have to wait for each other.

package main

import (
    "fmt"
    "sync"
    "time"
)

type Counter struct {
    m     sync.RWMutex
    value int
}

func (c *Counter) Update(n int, wg *sync.WaitGroup) {
    defer wg.Done()

    c.m.Lock()
    fmt.Printf("Adding %d to %d\n", n, c.value)
    c.value += n
    c.m.Unlock()
}

func (c *Counter) GetValue(wg *sync.WaitGroup) {
    defer wg.Done()

    c.m.RLock()
    defer c.m.RUnlock()
    fmt.Println("Get value:", c.value)
    time.Sleep(400 * time.Millisecond)
}

func main() {
    var wg sync.WaitGroup

    c := Counter{}

    wg.Add(4)

    go c.Update(10, &wg)
    go c.GetValue(&wg)
    go c.GetValue(&wg)
    go c.GetValue(&wg)

    wg.Wait()
}
$ go run main.go
Get value: 0
Adding 10 to 0
Get value: 10
Get value: 10

Note: Both sync.Mutex and sync.RWMutex implements the sync.Locker interface:

type Locker interface {
    Lock()
    Unlock()
}

Cond

The sync.Cond condition variable can be used to coordinate those goroutines that want to share resources. When the state of shared resources changes, it can be used to notify goroutines blocked by a mutex.

Each Cond has an associated lock (often a *Mutex or *RWMutex), which must be held when changing the condition and when calling the Wait method.

But why do we need it?

One scenario can be when one process is receiving data, and other processes must wait for this process to receive data before they can read the correct data.

If we simply use a channel or mutex, only one process can wait and read the data. There is no way to notify other processes to read the data. Thus, we can sync.Cond to coordinate shared resources.

Usage

sync.Cond comes with the following methods:

  • NewCond(l Locker) returns a new Cond.
  • Broadcast() wakes all goroutines waiting on the condition.
  • Signal() wakes one goroutine waiting on the condition if there is any.
  • Wait() atomically unlocks the underlying mutex lock.

Example

Here is an example that demonstrates the interaction of different goroutines using the Cond

package main

import (
    "fmt"
    "sync"
    "time"
)

var done = false

func read(name string, c *sync.Cond) {
    c.L.Lock()
    for !done {
        c.Wait()
    }
    fmt.Println(name, "starts reading")
    c.L.Unlock()
}

func write(name string, c *sync.Cond) {
    fmt.Println(name, "starts writing")
    time.Sleep(time.Second)

    c.L.Lock()
    done = true
    c.L.Unlock()

    fmt.Println(name, "wakes all")
    c.Broadcast()
}

func main() {
    var m sync.Mutex
    cond := sync.NewCond(&m)

    go read("Reader 1", cond)
    go read("Reader 2", cond)
    go read("Reader 3", cond)
    write("Writer", cond)

    time.Sleep(4 * time.Second)
}
$ go run main.go
Writer starts writing
Writer wakes all
Reader 2 starts reading
Reader 3 starts reading
Reader 1 starts reading

As we can see, the readers were suspended using the Wait method until the writer used the Broadcast method to wake up the process.

Once

Once ensures that only one execution will be carried out even among several goroutines.

Usage

Unlike other primitives, sync.Once only has a single method:

  • Do(f func()) calls the function f only once. If Do is called multiple times, only the first call will invoke the function f

Example

This seems pretty straightforward, let's take an example

package main

import (
    "fmt"
    "sync"
)

func main() {
    var count int

    increment := func() {
        count++
    }

    var once sync.Once

    var increments sync.WaitGroup
    increments.Add(100)

    for i := 0; i < 100; i++ {
        go func() {
            defer increments.Done()
            once.Do(increment)
        }()
    }

    increments.Wait()
    fmt.Printf("Count is %d\n", count)
}
$ go run main.go
Count is 1

As we can see, even when we ran 100 goroutines, the count only got incremented once.

Pool

Pool is s a scalable pool of temporary objects and is also concurrency safe. Any stored value in the pool can be deleted at any time without receiving notification. In addition, under high load, the object pool can be dynamically expanded, and when it is not used or the concurrency is not high, the object pool will shrink.

The key idea is the reuse of objects to avoid repeated creation and destruction, which will affect the performance.

But why do we need it?

Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.

The appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to spread the cost of allocation overhead across many clients.

It is important to note that Pool also has its performance cost. It is much slower to use sync.Pool than simple initialization. Also, a Pool must not be copied after first use.

Usage

sync.Pool gives us the following methods:

  • Get() selects an arbitrary item from the Pool, removes it from the Pool, and returns it to the caller.
  • Put(x any) adds the item to the pool

Example

Now, let's look at an example.

First, we will create a new sync.Pool, where we can optionally specify a function to generate a value when we call, Get, otherwise it will return a nil value.

package main

import (
    "fmt"
    "sync"
)

type Person struct {
    Name string
}

var pool = sync.Pool{
    New: func() any {
        fmt.Println("Creating a new person...")
        return &Person{}
    },
}

func main() {
    person := pool.Get().(*Person)
    fmt.Println("Get object from sync.Pool for the first time:", person)

    fmt.Println("Put the object back in the pool")
    pool.Put(person)

    person.Name = "Gopher"
    fmt.Println("Set object property name:", person.Name)

    fmt.Println("Get object from pool again (it's updated):", pool.Get().(*Person))
    fmt.Println("There is no object in the pool now (new one will be created):", pool.Get().(*Person))
}

And if we run this, we'll see an interesting output:

$ go run main.go
Creating a new person...
Get object from sync.Pool for the first time: &{}
Put the object back in the pool
Set object property name: Gopher
Get object from pool again (it's updated): &{Gopher}
Creating a new person...
There is no object in the pool now (new one will be created): &{}

Notice how we did type assertion when we call Get

It can be seen that the sync.Pool is strictly a temporary object pool, which is suitable for storing some temporary objects that will be shared among goroutines.

Map

Map is like the standard map[any]any but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes are spread over constant time.

But why do we need it?

The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.

The Map type is optimized for two common use cases:

  • When the entry for a given key is only ever written once but read many times, as in caches that only grow.
  • When multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, the use of a sync.Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

The zero Map is empty and ready for use. A Map must not be copied after first use.

Usage

sync.Map gives us the following methods:

  • Delete() deletes the value for a key.
  • Load(key any) returns the value stored in the map for a key, or nil if no value is present.
  • LoadAndDelete(key any) deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.
  • LoadOrStore(key, value any) returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, and false if stored.
  • Store(key, value any) sets the value for a key.
  • Range(f func(key, value any) bool) calls f sequentially for each key and value present in the map. If f returns false, the range stops the iteration.

Note: Range does not necessarily correspond to any consistent snapshot of the Map's contents.

Example

Let's look at an example. Here, we will launch a bunch of goroutines that will add and retrieve values from our map concurrently.

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    var m sync.Map

    wg.Add(10)
    for i := 0; i <= 4; i++ {
        go func(k int) {
            v := fmt.Sprintf("value %v", k)

            fmt.Println("Writing:", v)
            m.Store(k, v)
            wg.Done()
        }(i)
    }

    for i := 0; i <= 4; i++ {
        go func(k int) {
            v, _ := m.Load(k)
            fmt.Println("Reading: ", v)
            wg.Done()
        }(i)
    }

    wg.Wait()
}

As expected, our store and retrieve operation will be safe for concurrent use.

$ go run main.go
Reading: <nil>
Writing: value 0
Writing: value 1
Writing: value 2
Writing: value 3
Writing: value 4
Reading: value 0
Reading: value 1
Reading: value 2
Reading: value 3

Atomic

Package atomic provides low-level atomic memory primitives for integers and pointers that are useful for implementing synchronization algorithms.

Usage

atomic package provides several functions which do the following 5 operations for int, uint, and uintptr types:

  • Add
  • Load
  • Store
  • Swap
  • Compare and Swap

Example

We won't be able to cover all of the functions here. So, let's take a look at the most commonly used function like AddInt32 to get an idea.

package main

import (
  "fmt"
    "sync"
    "sync/atomic"
)

func add(w *sync.WaitGroup, num *int32) {
    defer w.Done()
    atomic.AddInt32(num, 1)
}

func main() {
    var n int32 = 0
    var wg sync.WaitGroup

    wg.Add(1000)
    for i := 0; i < 1000; i = i + 1 {
        go add(&wg, &n)
    }

    wg.Wait()

    fmt.Println("Result:", n)
}

Here, atomic.AddInt32 guarantees that the result of n will be 1000 as the instruction execution of atomic operations cannot be interrupted.

go run main.go
Result: 1000

Well, this covers our discussion about the sync package in Go. I'll see you in the next tutorial!