Sync package in Go
Learn about concurrency primitives such as WaitGroup, Mutex, Cond, Pool, Once, and Map
As we learned earlier, goroutines run in the same address space, so access to shared memory must be synchronized. The sync
package provides useful primitives.
WaitGroup
A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add
to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done
when finished. At the same time, Wait
can be used to block until all goroutines have finished.
Usage
We can use the sync.WaitGroup
using the following methods:
Add(delta int)
takes in an integer value which is essentially the number of goroutines that the waitgroup has to wait for. This must be called before we execute a goroutine.Done()
is called within the goroutine to signal that the goroutine has successfully executed.Wait()
blocks the program until all the goroutines specified byAdd()
have invokedDone()
from within.
Example
Let's take a look at an example
package main
import (
"fmt"
"sync"
)
func work() {
fmt.Println("working...")
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
work()
}()
wg.Wait()
}
If we run this, we can see our program runs as expected
$ go run main.go
working...
We can also pass the waitgroup to the function directly.
func work(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("working...")
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go work(&wg)
wg.Wait()
}
But is important to know that a waitgroup must not be copied after first use. And if it's explicitly passed into functions, it should be done by a pointer. This is because it can affect our counter which will disrupt the logic of our program.
Let's also increase the number of goroutines and update our waitgroup Add
method to wait for 4 goroutines.
func main() {
var wg sync.WaitGroup
wg.Add(4)
go work(&wg)
go work(&wg)
go work(&wg)
go work(&wg)
wg.Wait()
}
And as expected, all our goroutines were executed.
$ go run main.go
working...
working...
working...
working...
Mutex
A Mutex is a mutual exclusion lock that prevents other processes from entering a critical section of data while a process occupies it to prevent race conditions from happening.
What's a critical section?
So, a critical section can be a piece of code that must not be run by multiple threads at once because the code contains shared resources.
Usage
We can use sync.Mutex
using the following methods:
Lock()
acquires or holds the lock.Unlock()
releases the lock.TryLock()
tries to lock and reports whether it succeeded.
Example
Let's take a look at an example, we will create a Counter
struct and add an Update
method which will update the internal value.
package main
import (
"fmt"
"sync"
)
type Counter struct {
value int
}
func (c *Counter) Update(n int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Adding %d to %d\n", n, c.value)
c.value += n
}
func main() {
var wg sync.WaitGroup
c := Counter{}
wg.Add(4)
go c.Update(10, &wg)
go c.Update(-5, &wg)
go c.Update(25, &wg)
go c.Update(19, &wg)
wg.Wait()
fmt.Println(c.value)
}
Let's run this and see what happens.
$ go run main.go
Adding -5 to 0
Adding 10 to 0
Adding 19 to 0
Adding 25 to 0
Result is 49
That doesn't look accurate, seems like our value is always zero but we somehow got the correct answer.
Well, this is because, in our example, multiple goroutines are updating the value
variable. And as you must have guessed, this is not ideal.
This is the perfect use case for Mutex. So, let's start by using sync.Mutex
and wrap our critical section in between Lock()
and Unlock()
methods.
package main
import (
"fmt"
"sync"
)
type Counter struct {
m sync.Mutex
value int
}
func (c *Counter) Update(n int, wg *sync.WaitGroup) {
c.m.Lock()
defer wg.Done()
fmt.Printf("Adding %d to %d\n", n, c.value)
c.value += n
c.m.Unlock()
}
func main() {
var wg sync.WaitGroup
c := Counter{}
wg.Add(4)
go c.Update(10, &wg)
go c.Update(-5, &wg)
go c.Update(25, &wg)
go c.Update(19, &wg)
wg.Wait()
}
$ go run main.go
Adding -5 to 0
Adding 19 to -5
Adding 25 to 14
Adding 10 to 39
Result is 49
Looks like we solved our issue and the output looks correct as well.
Note: Similar to WaitGroup a Mutex must not be copied after first use.
RWMutex
An RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer.
In other words, readers don't have to wait for each other. They only have to wait for writers holding the lock.
sync.RWMutex
is thus preferable for data that is mostly read, and the resource that is saved compared to a sync.Mutex
is time.
Usage
Similar to sync.Mutex
, we can use sync.RWMutex
using the following methods:
Lock()
acquires or holds the lock.Unlock()
releases the lock.RLock()
acquires or holds the read lock.RUnlock()
releases the read lock.
Notice how RWMutex has additional RLock
and RUnlock
methods compared to Mutex.
Example
Let's add a GetValue
method which will read the counter value. We will also change sync.Mutex
to sync.RWMutex
Now, we can simply use the RLock
and RUnlock
methods so that readers don't have to wait for each other.
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
m sync.RWMutex
value int
}
func (c *Counter) Update(n int, wg *sync.WaitGroup) {
defer wg.Done()
c.m.Lock()
fmt.Printf("Adding %d to %d\n", n, c.value)
c.value += n
c.m.Unlock()
}
func (c *Counter) GetValue(wg *sync.WaitGroup) {
defer wg.Done()
c.m.RLock()
defer c.m.RUnlock()
fmt.Println("Get value:", c.value)
time.Sleep(400 * time.Millisecond)
}
func main() {
var wg sync.WaitGroup
c := Counter{}
wg.Add(4)
go c.Update(10, &wg)
go c.GetValue(&wg)
go c.GetValue(&wg)
go c.GetValue(&wg)
wg.Wait()
}
$ go run main.go
Get value: 0
Adding 10 to 0
Get value: 10
Get value: 10
Note: Both sync.Mutex
and sync.RWMutex
implements the sync.Locker
interface:
type Locker interface {
Lock()
Unlock()
}
Cond
The sync.Cond
condition variable can be used to coordinate those goroutines that want to share resources. When the state of shared resources changes, it can be used to notify goroutines blocked by a mutex.
Each Cond has an associated lock (often a *Mutex
or *RWMutex
), which must be held when changing the condition and when calling the Wait method.
But why do we need it?
One scenario can be when one process is receiving data, and other processes must wait for this process to receive data before they can read the correct data.
If we simply use a channel or mutex, only one process can wait and read the data. There is no way to notify other processes to read the data. Thus, we can sync.Cond
to coordinate shared resources.
Usage
sync.Cond
comes with the following methods:
NewCond(l Locker)
returns a new Cond.Broadcast()
wakes all goroutines waiting on the condition.Signal()
wakes one goroutine waiting on the condition if there is any.Wait()
atomically unlocks the underlying mutex lock.
Example
Here is an example that demonstrates the interaction of different goroutines using the Cond
package main
import (
"fmt"
"sync"
"time"
)
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
fmt.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
fmt.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
fmt.Println(name, "wakes all")
c.Broadcast()
}
func main() {
var m sync.Mutex
cond := sync.NewCond(&m)
go read("Reader 1", cond)
go read("Reader 2", cond)
go read("Reader 3", cond)
write("Writer", cond)
time.Sleep(4 * time.Second)
}
$ go run main.go
Writer starts writing
Writer wakes all
Reader 2 starts reading
Reader 3 starts reading
Reader 1 starts reading
As we can see, the readers were suspended using the Wait
method until the writer used the Broadcast
method to wake up the process.
Once
Once ensures that only one execution will be carried out even among several goroutines.
Usage
Unlike other primitives, sync.Once
only has a single method:
Do(f func())
calls the functionf
only once. IfDo
is called multiple times, only the first call will invoke the functionf
Example
This seems pretty straightforward, let's take an example
package main
import (
"fmt"
"sync"
)
func main() {
var count int
increment := func() {
count++
}
var once sync.Once
var increments sync.WaitGroup
increments.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer increments.Done()
once.Do(increment)
}()
}
increments.Wait()
fmt.Printf("Count is %d\n", count)
}
$ go run main.go
Count is 1
As we can see, even when we ran 100 goroutines, the count only got incremented once.
Pool
Pool is s a scalable pool of temporary objects and is also concurrency safe. Any stored value in the pool can be deleted at any time without receiving notification. In addition, under high load, the object pool can be dynamically expanded, and when it is not used or the concurrency is not high, the object pool will shrink.
The key idea is the reuse of objects to avoid repeated creation and destruction, which will affect the performance.
But why do we need it?
Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.
The appropriate use of a Pool is to manage a group of temporary items silently shared among and potentially reused by concurrent independent clients of a package. Pool provides a way to spread the cost of allocation overhead across many clients.
It is important to note that Pool also has its performance cost. It is much slower to use sync.Pool
than simple initialization. Also, a Pool must not be copied after first use.
Usage
sync.Pool
gives us the following methods:
Get()
selects an arbitrary item from the Pool, removes it from the Pool, and returns it to the caller.Put(x any)
adds the item to the pool
Example
Now, let's look at an example.
First, we will create a new sync.Pool
, where we can optionally specify a function to generate a value when we call, Get
, otherwise it will return a nil
value.
package main
import (
"fmt"
"sync"
)
type Person struct {
Name string
}
var pool = sync.Pool{
New: func() any {
fmt.Println("Creating a new person...")
return &Person{}
},
}
func main() {
person := pool.Get().(*Person)
fmt.Println("Get object from sync.Pool for the first time:", person)
fmt.Println("Put the object back in the pool")
pool.Put(person)
person.Name = "Gopher"
fmt.Println("Set object property name:", person.Name)
fmt.Println("Get object from pool again (it's updated):", pool.Get().(*Person))
fmt.Println("There is no object in the pool now (new one will be created):", pool.Get().(*Person))
}
And if we run this, we'll see an interesting output:
$ go run main.go
Creating a new person...
Get object from sync.Pool for the first time: &{}
Put the object back in the pool
Set object property name: Gopher
Get object from pool again (it's updated): &{Gopher}
Creating a new person...
There is no object in the pool now (new one will be created): &{}
Notice how we did type assertion when we call Get
It can be seen that the sync.Pool
is strictly a temporary object pool, which is suitable for storing some temporary objects that will be shared among goroutines.
Map
Map is like the standard map[any]any
but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes are spread over constant time.
But why do we need it?
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases:
- When the entry for a given key is only ever written once but read many times, as in caches that only grow.
- When multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, the use of a
sync.Map
may significantly reduce lock contention compared to a Go map paired with a separateMutex
orRWMutex
.
The zero Map is empty and ready for use. A Map must not be copied after first use.
Usage
sync.Map
gives us the following methods:
Delete()
deletes the value for a key.Load(key any)
returns the value stored in the map for a key, or nil if no value is present.LoadAndDelete(key any)
deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.LoadOrStore(key, value any)
returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, and false if stored.Store(key, value any)
sets the value for a key.Range(f func(key, value any) bool)
callsf
sequentially for each key and value present in the map. Iff
returns false, the range stops the iteration.
Note: Range does not necessarily correspond to any consistent snapshot of the Map's contents.
Example
Let's look at an example. Here, we will launch a bunch of goroutines that will add and retrieve values from our map concurrently.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
var m sync.Map
wg.Add(10)
for i := 0; i <= 4; i++ {
go func(k int) {
v := fmt.Sprintf("value %v", k)
fmt.Println("Writing:", v)
m.Store(k, v)
wg.Done()
}(i)
}
for i := 0; i <= 4; i++ {
go func(k int) {
v, _ := m.Load(k)
fmt.Println("Reading: ", v)
wg.Done()
}(i)
}
wg.Wait()
}
As expected, our store and retrieve operation will be safe for concurrent use.
$ go run main.go
Reading: <nil>
Writing: value 0
Writing: value 1
Writing: value 2
Writing: value 3
Writing: value 4
Reading: value 0
Reading: value 1
Reading: value 2
Reading: value 3
Atomic
Package atomic provides low-level atomic memory primitives for integers and pointers that are useful for implementing synchronization algorithms.
Usage
atomic
package provides several functions which do the following 5 operations for int
, uint
, and uintptr
types:
- Add
- Load
- Store
- Swap
- Compare and Swap
Example
We won't be able to cover all of the functions here. So, let's take a look at the most commonly used function like AddInt32
to get an idea.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func add(w *sync.WaitGroup, num *int32) {
defer w.Done()
atomic.AddInt32(num, 1)
}
func main() {
var n int32 = 0
var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 1000; i = i + 1 {
go add(&wg, &n)
}
wg.Wait()
fmt.Println("Result:", n)
}
Here, atomic.AddInt32
guarantees that the result of n
will be 1000 as the instruction execution of atomic operations cannot be interrupted.
go run main.go
Result: 1000
Well, this covers our discussion about the sync
package in Go. I'll see you in the next tutorial!