Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • skleff/multiprocessing-examples
  • lsaci/multiprocessing-examples
  • gsaurel/multiprocessing-examples
  • gepetto/multiprocessing-examples
4 results
Show changes
Commits on Source (36)
# pre-commit (Guilhem Saurel, 2022-04-13)
c1c0b78ff9371b640892568b797ac3ad2f35fbc4
pre-commit:
variables:
CACHE: ${CI_PROJECT_DIR}/.cache
PIP_CACHE: ${CACHE}/pip
PRE_COMMIT_HOME: ${CACHE}/pre-commit
cache:
paths:
- ${CACHE}
image: python
before_script: python -m pip --cache-dir ${PIP_CACHE} install pre-commit
script: pre-commit run -a
repos:
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
args:
- --filter-files
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.1
hooks:
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: check-added-large-files
- id: check-ast
- id: check-executables-have-shebangs
- id: check-json
- id: check-merge-conflict
- id: check-symlinks
- id: check-toml
- id: check-yaml
- id: debug-statements
- id: destroyed-symlinks
- id: detect-private-key
- id: end-of-file-fixer
- id: fix-byte-order-marker
- id: mixed-line-ending
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 23.10.1
hooks:
- id: black
BSD 2-Clause License
Copyright (c) 2020, Gepetto
Copyright (c) 2020-2022, CNRS
All rights reserved.
Redistribution and use in source and binary forms, with or without
......
# Multiprocessing Examples
Several members of the team need to lanch processes that require hours or days while taking all ressources of one CPU
[![SWH](https://archive.softwareheritage.org/badge/swh:1:rel:b4f547af5648cc200683763f4d588f74c4cf3dcc/)](https://archive.softwareheritage.org/swh:1:rel:b4f547af5648cc200683763f4d588f74c4cf3dcc;origin=https://gitlab.laas.fr/gepetto/multiprocessing-examples;visit=swh:1:snp:c47174f619666138647616ef1f5186c7748cd329)
Several members of the team need to launch processes that require hours or days while taking all ressources of one CPU
core to complete.
We can split the required time by using more CPU cores.
......
{
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"@type": "SoftwareSourceCode",
"license": "https://spdx.org/licenses/BSD-2-Clause",
"codeRepository": "git+https://gitlab.laas.fr/gepetto/multiprocessing-examples.git",
"dateCreated": "2020-06-09",
"datePublished": "2022-06-14",
"dateModified": "2022-06-14",
"issueTracker": "https://gitlab.laas.fr/gepetto/multiprocessing-examples/-/issues",
"name": "multiprocessing examples",
"version": "1.0.0",
"description": "Exemples for the multiprocessing standard module",
"applicationCategory": "Computing",
"developmentStatus": "concept",
"funder": {
"@type": "Organization",
"name": "LAAS-CNRS"
},
"keywords": [
"Python",
"HPC",
"multiprocessing",
"GPU"
],
"programmingLanguage": [
"Python"
],
"operatingSystem": [
"Linux"
],
"author": [
{
"@type": "Person",
"@id": "https://orcid.org/0000-0002-0867-1664",
"givenName": "Guilhem",
"familyName": "Saurel",
"email": "guilhem.saurel@laas.fr",
"affiliation": {
"@type": "Organization",
"name": "LAAS-CNRS"
}
},
{
"@type": "Person",
"@id": "https://orcid.org/0000-0002-8090-0601",
"givenName": "Nicolas",
"familyName": "Mansard",
"email": "nicolas.mansard@laas.fr",
"affiliation": {
"@type": "Organization",
"name": "LAAS-CNRS"
}
},
{
"@type": "Person",
"givenName": "David",
"familyName": "Gauchard",
"email": "david.gauchard@laas.fr",
"affiliation": {
"@type": "Organization",
"name": "LAAS-CNRS"
}
}
]
}
......@@ -9,7 +9,7 @@ WORKDIR /app
ADD manager.py minion.py work.py ./
ARG HOST
ENV HOST=$HOST
ARG MANAGER_HOST
ENV MANAGER_HOST=$MANAGER_HOST
CMD /app/minion.py
......@@ -11,11 +11,20 @@ In one shell: `./manager.py`
In another: `./boss.py`
And in many others: `./minion.py`
## Check how many tasks are still in the queue
```python
> from manager import QueueClient
> c = QueueClient()
> c.queue.qsize()
100
```
## Test on multiple computers
### Network architecture
The architecture will run 3 very different processes: the manager, the boss and the minions.
The architecture will run 3 very different processes: the manager, the boss and the minions.
You should first decide which computers your are going to use.
There is no official list of available host @ gepetto. You should decide which one to use with Guilhem.
......@@ -26,7 +35,7 @@ Most of the load is taken by the minions, so they should go on the computers whe
The manager should be "seen" by all the processes. So all the computers for minions and boss should be able to ping the computer where the manager is running.
The boss is going to send the requests to the manager and wait to collect the results. The boss typically runs on your personal computer. This computer can be off the main network (the manager does not have to be able to ping the boss), and can even be outside of the lab if the network port is open to ssh). See port forwarding below.
The boss is going to send the requests to the manager and wait to collect the results. The boss typically runs on your personal computer. This computer can be off the main network (the manager does not have to be able to ping the boss), and can even be outside of the lab if the network port is open to ssh). See port forwarding below.
### Port forwarding
......@@ -35,34 +44,33 @@ You may consider to run the boss on another network, for example from home, with
Yet, the manager/boss code is written to use port 7481 which is not open for remote connection at the lab.
You must then forward connection to port 7481 using:
```
bash
```bash
ssh -L 7481:localhost:7481 $USER@nyusan.laas.fr
```
Now, the boss can contact the manager buy simply connecting to localhost (and this will be forwarded by ssh to nyusan).
Now, the boss can contact the manager by simply connecting to localhost (and this will be forwarded by ssh to nyusan).
### Prepare your container
The docker container should be rebuilt explicitely for the host of your manager.
After being rebuilt, it has to be pushed on a container registery. Several option might be consider.
The most convenient way is likely to use the registery associated with the gitlab repository where this code is stored.
For example, clone this repository on your gitlab.laas.fr account, and use the registery available at gitlab.laas.fr:4567/$USER/multiprocessing-examples .
After being rebuilt, it has to be pushed on a container registry. Several option might be consider.
The most convenient way is likely to use the registry associated with the gitlab repository where this code is stored.
For example, clone this repository on your gitlab.laas.fr account, and use the registry available at gitlab.laas.fr:4567/$USER/multiprocessing-examples .
Then, the full docker name for your container will be gitlab.laas.fr:4567/$USER/multiprocessing-examples:EXAMPLE-NAME .
(change $USER to your login e.g. gsaurel or nmansard, and EXAMPLE_NAME to anything relevant to you e.g. nmansard-example-1)
You likely will have first to docker-log on the registery. For example, if using gitlab.laas.fr registery, type:
You likely will have first to docker-log on the registry. For example, if using gitlab.laas.fr registry, type:
```bash
docker login gitlab.laas.fr:4567
```
(don't forget to docker-login on each computer where you will use this registery)
(don't forget to docker-login on each computer where you will use this registry)
The built and push of the container is then to be run from this directory (on any computer your want, for example your personal computer).
```bash
docker build --build-arg HOST=nyusan -t gitlab.laas.fr:4567/$USER/multiprocessing-examples:EXAMPLE-NAME .
docker build --build-arg MANAGER_HOST=nyusan -t gitlab.laas.fr:4567/$USER/multiprocessing-examples:EXAMPLE-NAME .
docker push gitlab.laas.fr:4567/$USER/multiprocessing-examples:EXAMPLE-NAME
```
......
......@@ -10,11 +10,11 @@ class Boss(QueueClient):
def run(self):
while True:
task = random.randint(5, 1e4)
print('new task:', task)
print("new task:", task)
self.queue.put(task)
print('tasks left:', self.queue.qsize())
print("tasks left:", self.queue.qsize())
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
if __name__ == "__main__":
Boss().run()
......@@ -5,24 +5,32 @@ import os
from multiprocessing.managers import BaseManager
PORT = 7481
KEY = b'AiZa5Uavcoh3PiajvaeTee5z' # keep it secret, keep it safe !
KEY = b"AiZa5Uavcoh3PiajvaeTee5z" # keep it secret, keep it safe !
class QueueManager(BaseManager):
"""This Manager holds a Queue and waits for clients to use it."""
pass
class QueueClient:
"""Base class for users of the Queue."""
def __init__(self):
QueueManager.register('get_queue')
manager = QueueManager(address=(os.environ.get('HOST', 'localhost'), PORT), authkey=KEY)
QueueManager.register("get_queue")
manager = QueueManager(
address=(os.environ.get("MANAGER_HOST", "localhost"), PORT), authkey=KEY
)
manager.connect()
self.queue = manager.get_queue()
if __name__ == '__main__':
if __name__ == "__main__":
queue = multiprocessing.Queue()
QueueManager.register('get_queue', callable=lambda: queue)
QueueManager(address=('', PORT), authkey=KEY).get_server().serve_forever()
QueueManager.register("get_queue", callable=lambda: queue)
try:
QueueManager(address=("", PORT), authkey=KEY).get_server().serve_forever()
finally:
print()
print(f"exiting with approximately {queue.qsize()} items left in queue")
#!/usr/bin/env python3
import random
import time
from manager import QueueClient
......@@ -12,10 +11,13 @@ class Minion(QueueClient):
while True:
task = self.queue.get()
start = time.perf_counter()
print(f'start work on task {task}...')
print(f"start work on task {task}...")
result = find_prime(task)
print(f'Done ! The {task}-th prime number is {result} (found in {time.perf_counter() - start:.3f}s)')
print(
f"Done ! The {task}-th prime number is {result} "
f"(found in {time.perf_counter() - start:.3f}s)"
)
if __name__ == '__main__':
if __name__ == "__main__":
Minion().run()
......@@ -16,9 +16,9 @@ def find_prime(goal: int = 10, verbose: bool = False) -> int:
if is_prime(i):
found += 1
if verbose:
print(f'the prime number n°{found} is {i}')
print(f"the prime number n°{found} is {i}")
return i
if __name__ == '__main__':
if __name__ == "__main__":
print(find_prime(50, verbose=True))
......@@ -9,7 +9,7 @@ WORKDIR /app
ADD boss.py manager.py minion.py task.py ./
ARG HOST
ENV HOST=$HOST
ARG MANAGER_HOST
ENV MANAGER_HOST=$MANAGER_HOST
CMD /app/minion.py
......@@ -12,13 +12,13 @@ And in many others: `./minion.py`
## Test on multiple computers
We suppose that the host of the manager will be HOST (e.g. nyusan, *CHECK AVAILABILITY* before using it), user is $USER and docker registery gitlab.laas.fr:4567/$USER/multiprocessing-examples
We suppose that the host of the manager will be MANAGER_HOST (e.g. nyusan, *CHECK AVAILABILITY* before using it), user is $USER and docker registry gitlab.laas.fr:4567/$USER/multiprocessing-examples
First build and push the docker.
```bash
IMAGE="gitlab.laas.fr:4567/$USER/multiprocessing-examples:example-2"
docker build --build-arg HOST=nyusan -t "$IMAGE" .
docker build --build-arg MANAGER_HOST=nyusan -t "$IMAGE" .
docker login gitlab.laas.fr:4567 # only required once
docker push gitlab.laas.fr:4567/$USER/multiprocessing-examples:example-2
```
......
#!/usr/bin/env python3
import numpy as np
import time
from manager import QueueClient
from task import Task
......@@ -8,6 +8,7 @@ from task import Task
class Boss(QueueClient):
def run(self, tasks=100):
start = time.perf_counter()
# create some tasks
for i in range(tasks):
self.tasks.put(Task(i))
......@@ -15,9 +16,14 @@ class Boss(QueueClient):
results = []
for _ in range(tasks):
result = self.results.get()
print(f'got result {result.identifier} of size {result.size} processed in {result.time:.3f}s')
print(
f"got result {result.identifier} of size {result.size} "
f"processed in {result.time:.3f} s"
)
results.append(result)
print('Got all {tasks} results !')
end = time.perf_counter()
print(f"Got all {tasks} results in {end - start} s!")
if __name__ == '__main__':
if __name__ == "__main__":
Boss().run()
......@@ -5,28 +5,39 @@ import os
from multiprocessing.managers import BaseManager
PORT = 7481
KEY = b'AiZa5Uavcoh3PiajvaeTee5z' # keep it secret, keep it safe !
KEY = b"AiZa5Uavcoh3PiajvaeTee5z" # keep it secret, keep it safe !
class QueueManager(BaseManager):
"""This Manager holds a Queue and waits for clients to use it."""
pass
class QueueClient:
"""Base class for users of the Queue."""
def __init__(self):
QueueManager.register('get_tasks')
QueueManager.register('get_results')
manager = QueueManager(address=(os.environ.get('HOST', 'localhost'), PORT), authkey=KEY)
QueueManager.register("get_tasks")
QueueManager.register("get_results")
manager = QueueManager(
address=(os.environ.get("MANAGER_HOST", "localhost"), PORT), authkey=KEY
)
manager.connect()
self.tasks = manager.get_tasks()
self.results = manager.get_results()
if __name__ == '__main__':
if __name__ == "__main__":
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
QueueManager.register('get_tasks', callable=lambda: task_queue)
QueueManager.register('get_results', callable=lambda: result_queue)
QueueManager(address=('', PORT), authkey=KEY).get_server().serve_forever()
QueueManager.register("get_tasks", callable=lambda: task_queue)
QueueManager.register("get_results", callable=lambda: result_queue)
try:
QueueManager(address=("", PORT), authkey=KEY).get_server().serve_forever()
finally:
print()
print(
f"exiting with approximately {task_queue.qsize()} items left in task queue"
f" and {result_queue.qsize()} items left in result queue."
)
......@@ -7,11 +7,11 @@ class Minion(QueueClient):
def run(self):
while True:
task = self.tasks.get()
print(f'start work on a task {task.identifier} of size {task.size}...')
print(f"start work on a task {task.identifier} of size {task.size}...")
task.work()
print(f'Done in {task.time:.3f}s')
print(f"Done in {task.time:.3f}s")
self.results.put(task)
if __name__ == '__main__':
if __name__ == "__main__":
Minion().run()
......@@ -7,7 +7,7 @@ class Task:
def __init__(self, identifier):
self.identifier = identifier
# choosee the size of the problem
self.size = np.random.randint(2, 1000)
self.size = np.random.randint(300, 3_000)
# Generate the input of the problem
self.a = np.random.rand(self.size, self.size)
self.b = np.random.rand(self.size)
......@@ -17,6 +17,5 @@ class Task:
def work(self):
start = time.perf_counter()
self.x = self.b
self.x = np.linalg.solve(self.a, self.b)
self.time = time.perf_counter() - start
......@@ -7,8 +7,8 @@ RUN /setup.sh
ENV PATH=/opt/openrobots/bin:$PATH \
LD_LIBRARY_PATH=/opt/openrobots/lib
ARG HOST
ENV HOST=$HOST
ARG MANAGER_HOST
ENV MANAGER_HOST=$MANAGER_HOST
WORKDIR /app
......
......@@ -3,9 +3,7 @@
import time
import numpy as np
import pinocchio as pin
from manager import QueueClient
from task import Task, TaskParameters
......@@ -20,16 +18,19 @@ class Boss(QueueClient):
results = []
for _ in qs:
result = self.results.get()
print(f'got result {result.identifier} of processed in {result.time:.3f}s : ')
print(
f"got result {result.identifier} of processed in {result.time:.3f}s : "
)
results.append(result)
print('Got all {tasks} results !')
print("Got all {tasks} results !")
if __name__ == '__main__':
if __name__ == "__main__":
# Allocate data to be proceeded.
NDATA = 20
qs = np.array([pin.randomConfiguration(TaskParameters.robot.model) for _ in range(NDATA)])
qs = np.array(
[pin.randomConfiguration(TaskParameters.robot.model) for _ in range(NDATA)]
)
vqs = np.random.rand(NDATA, TaskParameters.robot.model.nv) * 2 - 1
aqs = np.random.rand(NDATA, TaskParameters.robot.model.nv) * 2 - 1
......@@ -37,4 +38,4 @@ if __name__ == '__main__':
Boss().run(qs, vqs, aqs)
total = time.perf_counter() - start
print(f'Completed in {total:.3f} secs')
print(f"Completed in {total:.3f} secs")