This is the first in a two-part series on distributed computing using Ray. This part shows how to use Ray on your local PC, and part 2 shows how to scale Ray to multi-server clusters in the cloud.
gotten a new 16-core laptop or desktop, and you’re eager to test its power with some heavy computations.
You’re a Python programmer, though not an expert yet, so you open your favourite LLM and ask it something like this.
“I would like to count the number of prime numbers within a given input range. Please give me some Python code for this.”
After a few seconds, the LLM gives you some code. You might tweak it a bit through a short back-and-forth, and eventually you end up with something like this:
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == “__main__”:
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start “chunky”; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B – A) // chunks
print(f”CPUs~{total_cpus}, chunks={chunks}”)
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks – 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f”total={total}, time={time.time() – t0:.2f}s”)
You run the program and it works perfectly. The only problem is that it takes quite a bit of time to run , maybe thirty to sixty seconds, depending on the size of your input range. That’s probably unacceptable.
What do you do now? You have several options, with the three most common probably being:
– Parallelise the code using threads or multi-processing
– Rewrite the code in a “fast” language like C or Rust
– Try a library like Cython, Numba or NumPy
These are all viable options, but each has disadvantages. Options 1 and 3 significantly increase your code complexity, and the middle option may require you to learn a new programming language.
What if I told you that there was another way? One where the changes required to your existing code would be kept to an absolute minimum. One where your runtime is automatically spread across all your available cores.
That’s precisely what the third-party Ray library promises to do.
What is Ray?
The Ray Python library is an open-source distributed computing framework designed to make it easy to scale Python programs from a laptop to a cluster with minimal code changes.
Ray makes it simple to scale and distribute compute-intensive application workloads — from deep learning to data processing — across clusters of remote computers, while also delivering practical application runtime improvements on your laptop, desktop, or even a remote cloud-based compute cluster.
Ray provides a rich set of libraries and integrations built on a flexible distributed execution framework, making distributed computing easy and accessible to all.
In short, Ray lets you parallelise and distribute your Python code with minimal effort, whether it’s running locally on a laptop or on a giant cloud-based cluster.
Using Ray
In the rest of this article, I’ll take you through the basics of using Ray to speed up CPU-intensive Python code, and we’ll set up some example code snippets to show you how easy it is to incorporate the power of Ray into your own workloads.
To get the most out of using Ray, if you are a data scientist or machine learning engineer, there are a few key concepts you need to understand first. Ray is made up of several components.
Ray Data is a scalable library designed for data processing in ML and AI tasks. It offers flexible, high-performance APIs for AI tasks, including batch inference, data preprocessing, and data ingestion for ML training.
Ray Train is a flexible, scalable library designed for distributed machine learning training and fine-tuning.
Ray Tune is used for Hyperparameter Tuning.
Ray Serve is a scalable library for deploying models to facilitate online inference APIs.
Ray RLlib is used for scalable reinforcement learning
As you can see, Ray is very focused on large language models and AI applications, but there is one last important component I haven’t mentioned yet, and it’s the one I’ll be using in this article.
Ray Core is designed for scaling CPU-intensive general-purpose Python applications. It’s designed to spread your Python workload over all available cores on whichever system you’re running it on.
This article will be talking exclusively about Ray Core.
Two essential concepts to grasp within Ray Core are tasks and actors.
Tasks are stateless workers or services implemented using Ray by decorating regular Python functions.
Actors (or stateful workers) are used, for example, when you need to keep track of and maintain the state of dependent variables across your distributed cluster. Actors are implemented by decorating regular Python classes.
Both actors and tasks are defined using the same @ray.remote decorator. Once defined, these tasks are executed with the special .remote() method provided by Ray. We’ll look at an example of this next.
Setting up a development environment
Before we start coding, we should set up a development environment to keep our projects siloed so they don’t interfere with each other. I’ll be using conda for this, but feel free to use whichever tool you prefer. I’ll be running my code using a Jupyter notebook in a WSL2 Ubuntu shell on Windows.
$ conda create -n ray-test python=3.13 -y
$ conda activate ray-test
(ray-test) $ conda install ray[default]
Code example – counting prime numbers
Let’s revisit the example I gave at the beginning: counting the number of primes within the interval 10,000,000 to 20,000,000.
We’ll run our original Python code and time how long it takes.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == “__main__”:
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start “chunky”; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B – A) // chunks
print(f”CPUs~{total_cpus}, chunks={chunks}”)
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks – 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f”total={total}, time={time.time() – t0:.2f}s”)
And the output?
CPUs~32, chunks=64
total=606028, time=31.17s
Now, can we improve that using Ray? Yes, by following this simple 4-step process.
Step 1 - Initialise Ray. Add these two lines at the start of your code.
import ray
ray.init()
Step 2 - Create our remote function. That’s easy. Just decorate the function we want to optimise with the @ray.remote decorator. The function to be decorated is the one that’s performing the most work. In our example, that’s the count_primes function.
@ray.remote(num_cpus=1)
def count_primes(start: int, end: int) -> int:
…
…
Step 3 - Launch the parallel tasks. Call your remote function using the .remote Ray directive.
refs.append(count_primes.remote(s, e))
Step 4 - Wait for all our tasks to complete. Each task in Ray returns an ObjectRef when it’s been called. This is a promise from Ray. It means Ray has set the task off running remotely, and Ray will return its value at some point in the future. We monitor all the ObjectRefs returned by running tasks using the ray.get() function. This blocks until all tasks have completed.
results = ray.get(tasks)
Let’s put this all together. As you will see, the changes to our original code are minimal — just four lines of code added and a print statement to display the number of nodes and cores we’re running on.
import math
import time
# —————————————–
# Change No. 1
# —————————————–
import ray
ray.init(auto)
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
# —————————————–
# Change No. 2
# —————————————–
@ray.remote(num_cpus=1) # pure-Python loop → 1 CPU per task
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == “__main__”:
A, B = 10_000_000, 60_000_000
total_cpus = int(ray.cluster_resources().get(“CPU”, 1))
# Start “chunky”; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B – A) // chunks
print(f”nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}”)
t0 = time.time()
refs = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks – 1 else B
# —————————————–
# Change No. 3
# —————————————–
refs.append(count_primes.remote(s, e))
# —————————————–
# Change No. 4
# —————————————–
total = sum(ray.get(refs))
print(f”total={total}, time={time.time() – t0:.2f}s”)
Now, has it all been worthwhile? Let’s run the new code and see what we get.
2025-11-01 13:36:30,650 INFO worker.py:2004 — Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/tom/.local/lib/python3.10/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
nodes=1, CPUs~32, chunks=64
total=606028, time=3.04s
Well, the result speaks for itself. The Ray Python code is 10x faster than the regular Python code. Not too shabby.
Where does this increase in speed come from? Well, Ray can spread your workload to all the cores on your system. A core is like a mini-CPU. When we ran our original Python code, it used only one core. That’s fine, but if your CPU has more than one core, which most modern PCs do, then you’re leaving money on the table, so to speak.
In my case, the CPU has 24 cores, so it’s not surprising that my Ray code was way faster than the non-Ray code.
Monitoring Ray jobs
Another point worth mentioning is that Ray makes it very easy to monitor job executions via a dashboard. Notice in the output we received when running our Ray example code, we saw this,
… — Started a local Ray instance. View the dashboard at 127.0.0.1:8265
It’s showing a local URL link because I’m running this on my desktop. If you were running this on a cluster, the URL would point to a location on the cluster head node.
When you click on the given URL link, you should see something similar to this,
Image by Author
From this main screen, you can drill down to monitor many aspects of your Ray programs using the menu links at the top of the page.
Using Ray actors
I previously mentioned that actors were an integral part of the Ray core processing. Actors are used to coordinate and share data between Ray tasks. For example, say you want to set a global limit for ALL running tasks that they must adhere to. Let’s say you have a pool of worker tasks, but you want to ensure that only a maximum of five of those tasks can run simultaneously. Here is some code you might think would work.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == “__main__”:
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start “chunky”; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B – A) // chunks
print(f”CPUs~{total_cpus}, chunks={chunks}”)
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks – 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f”total={total}, time={time.time() – t0:.2f}s”)
We have used a global variable to limit the number of running tasks, and the code is syntactically correct, running without error. Unfortunately, you won’t get the result you expected. That’s because each Ray task runs in its own process space and has its own copy of the global variable. The global variable is NOT shared between functions. So when we run the above code, we will see output like this,
Total calls: 200
Intended GLOBAL_QPS: 5.0
Expected time if truly global-limited: ~40.00s
Actual time with ‘global var’ (broken): 3.80s
Observed cluster QPS: ~52.6 (should have been ~5.0)
To fix this, we use an actor. Recall that an actor is just a Ray-decorated Python class. Here is the code with actors.
import time, ray
ray.init(ignore_reinit_error=True, log_to_driver=False)
# This is our actor
@ray.remote
class GlobalPacer:
“””Serialize calls so cluster-wide rate <= qps.”””
def __init__(self, qps: float):
self.interval = 1.0 / qps
self.next_time = time.time()
def acquire(self):
# Wait inside the actor until we can proceed
now = time.time()
if now < self.next_time:
time.sleep(self.next_time – now)
# Reserve the next slot; guard against drift
self.next_time = max(self.next_time + self.interval, time.time())
return True
@ray.remote
def call_api_with_limit(n_calls: int, pacer):
done = 0
for _ in range(n_calls):
# Wait for global permission
ray.get(pacer.acquire.remote())
# pretend API call (no extra sleep here)
done += 1
return done
if __name__ == “__main__”:
NUM_WORKERS = 10
CALLS_EACH = 20
GLOBAL_QPS = 5.0 # cluster-wide cap
total_calls = NUM_WORKERS * CALLS_EACH
expected_min_time = total_calls / GLOBAL_QPS
pacer = GlobalPacer.remote(GLOBAL_QPS)
t0 = time.time()
ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)])
dt = time.time() – t0
print(f”Total calls: {total_calls}”)
print(f”Global QPS cap: {GLOBAL_QPS}”)
print(f”Expected time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s”)
print(f”Actual time with actor: {dt:.2f}s”)
print(f”Observed cluster QPS: ~{total_calls/dt:.1f}”)
Our limiter code is encapsulated in a class (GlobalPacer) and decorated with ray.remote, meaning it applies to all running tasks. We can see the difference this makes to the output by running the updated code.
Total calls: 200
Global QPS cap: 5.0
Expected time (if capped at 5.0 QPS): ~40.00s
Actual time with actor: 39.86s
Observed cluster QPS: ~5.0
Summary
This article introduced Ray, an open-source Python framework that makes it easy to scale compute-intensive programs from a single core to multiple cores or even a cluster with minimal code changes.
I briefly mentioned the key components of Ray—Ray Data, Ray Train, Ray Tune, Ray Serve, and Ray Core—emphasising that Ray Core is ideal for general-purpose CPU scaling.
I explained some of the essential concepts in Ray Core, such as its introduction of tasks (stateless parallel functions), actors (stateful workers for shared state and coordination), and ObjectRefs (a future promise of a task’s return value)
To showcase the advantages of using Ray, I began with a simple CPU-intensive example — counting prime numbers over a range — and showed how running it on a single core can be slow with a naive Python implementation.
Instead of rewriting the code in another language or using complex multiprocessing libraries, Ray allows you to parallelise the workload in just four simple steps and just a few extra lines of code:
- ray.init() to start Ray
- Decorate your functions with @ray.remote to turn them into parallel tasks
- .remote() to launch tasks concurrently, and
- ray.get() to collect task results.
This approach cut the runtime of the prime-counting example from ~30 seconds to ~3 seconds on a 24-core machine.
I also mentioned how easy it is to monitor running jobs in Ray using its built-in dashboard and showed how to access it.
Finally, I provided an example of using a Ray Actor by showing why global variables are not suitable for coordinating across tasks, since each worker has its own memory space.
In the second part of this series, we’ll see how to take things to another level by enabling Ray jobs to use even more CPU power as we scale to large multi-node servers in the cloud via Amazon Web Services.

