Ruby Multithreading
Every program running on the system is a process. Each process contains one or multiple threads.
A thread is a single sequence of control within a program. Running multiple threads simultaneously within a single program to accomplish different tasks is called multithreading.
In Ruby, we can create multithreading through the Thread class. Ruby's threads are lightweight and can efficiently implement parallel code.
Creating Ruby Threads
To start a new thread, simply call Thread.new:
# Thread #1 code section
Thread.new {
# Thread #2 execution code
}
# Thread #1 execution code
Example
The following example shows how to use multithreading in a Ruby program:
#!/usr/bin/ruby
def func1
i = 0
while i <= 2
puts "func1 at: #{Time.now}"
sleep(2)
i = i + 1
end
end
def func2
j = 0
while j <= 2
puts "func2 at: #{Time.now}"
sleep(1)
j = j + 1
end
end
puts "Started At #{Time.now}"
t1 = Thread.new { func1() }
t2 = Thread.new { func2() }
t1.join
t2.join
puts "End at #{Time.now}"
The above code execution result is:
Started At Wed May 14 08:21:54 -0700 2014
func1 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:54 -0700 2014
func2 at: Wed May 14 08:21:55 -0700 2014
func1 at: Wed May 14 08:21:56 -0700 2014
func2 at: Wed May 14 08:21:56 -0700 2014
func1 at: Wed May 14 08:21:58 -0700 2014
End at Wed May 14 08:22:00 -0700 2014
Thread Lifecycle
- Thread creation can use Thread.new, and similarly, Thread.start or Thread.fork can be used with the same syntax to create threads.
- After creating a thread, it does not need to be started; it will execute automatically.
- The Thread class defines several methods to manipulate threads. The thread executes the code block in Thread.new.
- The last statement in the thread block is the thread's value, which can be called through the thread's method. If the thread has finished executing, it returns the thread value; otherwise, it does not return a value until the thread has finished executing.
- The Thread.current method returns the object representing the current thread. The Thread.main method returns the main thread.
- The Thread.Join method executes the thread. This method suspends the main thread until the current thread has finished executing.
Thread States
A thread has 5 states:
Thread State | Return Value |
---|---|
Runnable | run |
Sleeping | Sleeping |
Exiting | aborting |
Normal termination | false |
Exception termination | nil |
Thread and Exception
When an exception occurs in a thread and is not caught by rescue, the thread is usually terminated without warning. However, if other threads are waiting for this thread due to Thread#join, the waiting threads will also raise the same exception.
begin
t = Thread.new do
Thread.pass # The main thread is indeed waiting for join
raise "unhandled exception"
end
t.join
rescue
p $! # => "unhandled exception"
end
To interrupt the interpreter when a thread terminates due to an exception, use one of the following three methods:
- Run the script with the -d option in debug mode.
- Set the flag with
Thread.abort_on_exception
. - Set the flag for a specific thread with
Thread#abort_on_exception
.
When using one of these three methods, the entire interpreter will be interrupted.
t = Thread.new { ... }
t.abort_on_exception = true
Thread Synchronization Control
In Ruby, there are three ways to achieve synchronization:
- Implement thread synchronization through the Mutex class.
- Implement thread synchronization through the Queue class, which manages data交接.
- Use ConditionVariable for synchronization control.
Implementing Thread Synchronization with the Mutex Class
To control thread synchronization with the Mutex class, if multiple threads need a program variable simultaneously, you can lock this variable using lock.
#!/usr/bin/ruby
require "thread"
puts "Synchronize Thread"
@num = 200
@mutex = Mutex.new
def buyTicket(num)
@mutex.lock
if @num >= num
@num = @num - num
puts "you have successfully bought #{num} tickets"
else
puts "sorry, no enough tickets"
end
@mutex.unlock
end
ticket1 = Thread.new 10 do
10.times do |value|
ticketNum = 15
buyTicket(ticketNum)
sleep 0.01
end
end
ticket2 = Thread.new 10 do
10.times do |value|
ticketNum = 20
buyTicket(ticketNum)
sleep 0.01
end
end
sleep 1
ticket1.join
ticket2.join
The above code execution result is:
Synchronize Thread
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
you have successfully bought 20 tickets
you have successfully bought 15 tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
sorry, no enough tickets
In addition to using lock to lock variables, you can also use try_lock to lock variables, and use Mutex.synchronize to synchronize access to a variable.
Implementing Thread Synchronization with the Queue Class
The Queue class represents a thread-safe queue that synchronizes access to the end of the queue. Different threads can use the same queue without worrying about whether the data in the queue is synchronized. Additionally, the SizedQueue class can limit the queue's length.
The SizedQueue class can help us develop thread-synchronized applications very conveniently because once data is added to this queue, we don't need to worry about thread synchronization issues.
The classic producer-consumer problem:
#!/usr/bin/ruby
require "thread"
puts "SizedQuee Test"
queue = Queue.new
producer = Thread.new do
10.times do |i|
sleep rand(i) # Let the thread sleep for a while
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
10.times do |i|
value = queue.pop
sleep rand(i/2)
puts "consumed #{value}"
end
end
consumer.join
The program output:
SizedQuee Test
0 produced
1 produced
consumed 0
2 produced
consumed 1
consumed 2
3 produced
consumed 3
4 produced
consumed 4
5 produced
consumed 5
6 produced
consumed 6
7 produced
consumed 7
8 produced
9 produced
consumed 8
consumed 9
Thread Variables
Threads can have private variables that are written into the thread upon creation and can be used within the thread's scope but cannot be shared outside the thread.
However, sometimes local thread variables need to be accessed by other threads or the main thread. Ruby provides a way to create thread variables by name, treating the thread like a hash-like associative array. Data can be written using []= and read using []. Here's an example:
#!/usr/bin/ruby
count = 0
arr = []
10.times do |i|
arr[i] = Thread.new {
sleep(rand(0)/10.0)
Thread.current["mycount"] = count
count += 1
}
end
arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"
The above code execution result is:
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10
The main thread waits for the child threads to complete and then outputs each value.
Thread Priority
Thread priority is the main factor affecting thread scheduling. Other factors include the length of CPU execution time, thread group scheduling, etc.
You can get a thread's priority using the Thread.priority method and adjust it using the Thread.priority= method.
The default thread priority is 0. Higher priority threads execute faster.
A Thread can access all data within its own scope. However, what if you need to access data from other threads? The Thread class provides methods for threads to access each other's data. You can simply treat a thread as a Hash table, writing data with []= and reading data with [].
```ruby
athr = Thread.new { Thread.current["name"] = "Thread A"; Thread.stop }
bthr = Thread.new { Thread.current["name"] = "Thread B"; Thread.stop }
cthr = Thread.new { Thread.current["name"] = "Thread C"; Thread.stop }
Thread.list.each {|x| puts "#{x.inspect}: #{x["name"]}" }
We can see that by treating threads as a Hash table and using the [] and []= methods, we have achieved data sharing between threads.
Thread Mutex
A Mutex (Mutual Exclusion) is a mechanism used in multithreaded programming to prevent two threads from simultaneously accessing a shared resource (like a global variable).
Example without Mutex
#!/usr/bin/ruby
require 'thread'
count1 = count2 = 0
difference = 0
counter = Thread.new do
loop do
count1 += 1
count2 += 1
end
end
spy = Thread.new do
loop do
difference += (count1 - count2).abs
end
end
sleep 1
puts "count1 : #{count1}"
puts "count2 : #{count2}"
puts "difference : #{difference}"
Output of the above example:
count1 : 9712487
count2 : 12501239
difference : 0
Example with Mutex
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
count1 = count2 = 0
difference = 0
counter = Thread.new do
loop do
mutex.synchronize do
count1 += 1
count2 += 1
end
end
end
spy = Thread.new do
loop do
mutex.synchronize do
difference += (count1 - count2).abs
end
end
end
sleep 1
mutex.lock
puts "count1 : #{count1}"
puts "count2 : #{count2}"
puts "difference : #{difference}"
Output of the above example:
count1 : 1336406
count2 : 1336406
difference : 0
Deadlock
Deadlock occurs when two or more operations are waiting for each other to complete, preventing any of them from proceeding.
For example, a process p1 holds the display and needs the printer, which is held by process p2, which in turn needs the display. This forms a deadlock.
When using Mutex objects, we need to be cautious of thread deadlocks.
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
a = Thread.new {
mutex.synchronize {
puts "A: I have critical section, but will wait for cv"
cv.wait(mutex)
puts "A: I have critical section again! I rule!"
}
}
puts "(Later, back at the ranch...)"
b = Thread.new {
mutex.synchronize {
puts "B: Now I am critical, but am done with cv"
cv.signal
puts "B: I am still critical, finishing up"
}
}
a.join
b.join
Output of the above example:
A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!
Thread Class Methods
Here is a complete list of Thread class methods:
No. | Method Description | ||
---|---|---|---|
1 | Thread.abort_on_exception <br>If true, the entire interpreter will be interrupted if a thread terminates due to an exception. Default is false. | ||
2 | Thread.abort_on_exception= <br>Sets the state to true or false and returns the new state. | ||
3 | Thread.critical <br>Returns a boolean value. | ||
4 | Thread.critical= <br>When set to true, thread switching will not occur. This will automatically become false if the current thread is suspended or a signal is received. | ||
5 | Thread.current <br>Returns the currently running thread. | ||
6 | Thread.exit <br>Terminates the current thread. Returns the current thread. If it is the only thread, it will exit with status 0. | ||
7 | Thread.fork { block } <br>Creates a new thread. | ||
8 | Thread.kill( aThread ) <br>Terminates the thread. | ||
9 | Thread.list <br>Returns an array of alive threads that are running or suspended. | ||
10 | Thread.main <br>Returns the main thread. | ||
11 | Thread.new( [ arg ]* ) { | args | block } <br>Creates a new thread and starts it. Arguments are passed to the block. |
12 | Thread.pass <br>Yields the CPU to other threads. | ||
13 | Thread.start( [ args ]* ) { | args | block } <br>Creates a new thread and starts it. Arguments are passed to the block. |
14 | Thread.stop <br>Suspends the current thread until another thread wakes it up with the run method. |
Thread Instance Methods
Here is an example calling the join instance method:
#!/usr/bin/ruby
thr = Thread.new do # Instantiation
puts "In second thread"
raise "Raise exception"
end
thr.join # Calling the join instance method
Here is a complete list of Thread instance methods:
No. | Method Description |
---|---|
1 | thr[ name ] <br>Retrieves the thread-specific data associated with name. Returns nil if no data is associated. |
2 | thr[ name ] = <br>Sets the thread-specific data associated with name. Setting it to nil deletes the data. |
3 | thr.abort_on_exception <br>Returns a boolean value. |
4 | thr.abort_on_exception= <br>If set to true, the entire interpreter will be interrupted if this thread terminates due to an exception. |
5 | thr.alive? <br>Returns true if the thread is alive. |
6 | thr.exit <br>Terminates the thread. Returns self. |
7 | thr.join <br>Suspends the current thread until self terminates. If self terminates due to an exception, it will raise the same exception in the current thread. |
8 | thr.key? <br>Returns true if thread-specific data is defined for the given name. |
9 | thr.kill <br>Similar to Thread.exit. |
10 | thr.priority <br>Returns the thread's priority. The default priority is 0. Higher values mean higher priority. |
11 | thr.priority= <br>Sets the thread's priority. Can be negative. |
12 | thr.raise( anException ) <br>Raises an exception within the thread. |
13 | thr.run <br>Resumes a suspended thread. Unlike wakeup, it immediately switches to the thread. |
14 | thr.safe_level <br>Returns the safety level of the thread. The safety level of the current thread is the same as $SAFE. |
15 | thr.status <br>Returns the status of the thread as a string ("run", "sleep", or "aborting"). Returns false if the thread terminates normally, and nil if it terminates due to an exception. |
16 | thr.stop? <br>Returns true if the thread is dead or suspended. |
17 | thr.value <br>Waits for the thread to terminate and returns the value returned by the block. If an exception occurs in the thread, it will be re-raised in the caller. |
18 | thr.wakeup <br>Marks a suspended thread as eligible to run. If the thread is dead, it will raise a ThreadError. |
```