How do I pass an argument through `group()` to each task in that group? For instance, if I want all tasks to receive `foo=‘bar’`.
bswanson has quit
domino14 joined the channel
asksol_
I know args can be passed in 4.0 (master): g.apply_async(foo='bar'), if it doesn't work in 3.1 then the only solution is to do for task in g.tasks: g.kwargs['foo'] = 'bar'
foist,
foist
asksol_: thanks, I will try
ninkotech joined the channel
asksol_: my line looks like this my_method.chunks(ids, chunks_per_group).group()`
How would I pass it in there?
domino14 has quit
asksol_
.group() returns a group, so g = my_method.chunks(ids, chunks_per_group).group()
foist
asksol_: does it return a single group, or a list of groups?
asksol_
oh, it's a single group of map tasks
starmap to be exact, so I guess it's more difficult for chunks
you would need to pass it in as a positional argument
foist
And where would I do that within my code?
asksol_
It's not "pure", but I guess it wouldn't hurt for starmap/map to support keyword arguments, but too late for 3.1 anyway
if you have a task x(a, foo=None), then x.chunks([(arg, foo) for arg in arguments])
x.chunks(((arg, foo) for arg in arguments), 10) is lazy
foist
The many levels of nesting confuse the hell out of me.
mher has quit
asksol_: thanks, it worked!
Xo has quit
asksol_: could you explain to me what the point of chunks is? I am not sure I get it. Why split the work into multiple piece when they’re all going to be iterated over anyways? Is it so each piece can be sent to a different worker, possibly one that is on another server?
Audioburn joined the channel
frodopwns joined the channel
cliles joined the channel
asksol_
no, chunks is for when you have 1000 items, but instead of using 1000 tasks to process them you want to have 10 tasks processing 100 items each
maryokhin has quit
Vitor has quit
cjhubert has quit
foist
asksol_: again, what’s the point? You’re still going to go through 1000 items
Vitor joined the channel
asksol_
messaging overhead, cpu caches, and data locality, etc
domino14 joined the channel
foist
asksol_: Sorry, I don’t get it yet. A task is a callable that has been turned into a class. Messages are put in the queue which are retrieved and acknowledged by workers, thereby removing them from the queue. A message contains the necessary information for invoking the callable class, like args to be passed in, as well as some metadata about the
execution, like the number of retries. When a worker gets the message, the call to the task is “built” and issued; ie. the task is called.
domino14 has quit
asksol_: A message is put in the queue for each time the callable needs to be called. If there are 1000 items, 1000 different messages are put into the queue, resulting in 1000 tasks. …I feel like this is where my understanding breaks down. Or have I been wrong the whole time?
Vitor has quit
frodopwns has quit
asksol_
if you do group(add.s(i, i) for i in range(1000)).delay() there are 1000 messages, if you do add.chunks((i, i) for i in range(1000)), 100) there will be 10 tasks.
there's no real task context in this case, so retries will not work
foist
asksol_: okay, retries are irrelevant. I don’t get it though. add() will still have to be executed 1000 times. What’s the difference?
I’m not trying to be dense. I’ve read much of the docs and have even used Celery with some success in the past, but I don’t get the benefits of chunking. I don’t see where there are savings, I guess.
asksol_
depends on what add() is doing, you know that the items will be processed by the same process and things may be in cache
messaging also adds overhead, so in the add numbers case doing 1000 additions in every task will be a whole lot faster.
100 even
if you have 10000 tasks adding numbers it's from 2 s down to 0.001
foist
So one task can result in a number of calls with different arguments to the same method?
asksol_
if you have a task sending 100k urls to fetch, you can sort the chunks so that domains will be in the DNS cache
yeah, it's like [map(task, chunked_args) for chunked_args in chunks(args, 100)]
where map is [task(args) for args in chunked_args] and starmap is [task(*args) for args in chunked_args]
and of course, task(*args) calls the function inline, as opposed to task.delay(*args) :)
foist
Thanks, asksol_. I have a lot to learn.
asksol_
lucky for us, that never ends
foist
Is there a good intro to Celery video or book you know of?
Something that covers all bases a bit?
mher joined the channel
asksol_
I'm not aware of anything like that, but this is more of like a special case. You don't need it unless you have a reason to optimize something
foist
asksol_: I’m trying to build a syncing system (data between two servers, inconsistently fomatted) and Celery is involved. I’ve used chunks because I assumed it would help me optimize request speed and volume once I gather some performance data and see how things are running.
asksol_
I wish there was a good resource on distributed event systems, actors, task granularity (which is part of current topic), and all the stuff that celery builds on, but there's very little so hard to point to
you can look up 'task granularity' to get some information about how tasks should be composed in parallel and distributed environments
foist
In my particular case, server x is receiving a list of ints, and then it sends a GET request to server y, using the ints list as the query param. I’m chunking the list of ints.
So that I can control how many are put into a single GET request
asksol_
you can always benchmark it, sounds like it could be beneficial
especially if there are thousands of ints
foist
Could be many more.
asksol_
since there's I/O you may get better parallelization if you use one task per request (then again, if there's only 10 child processes (-c 10) in total you would be maxed out already)
foist
Alright. I’m going to sleep now. I’ll surely be back with more troubles sometime soon. Thanks again, asksol_.
asksol_
if you 1000 different urls, then you may want to optimize on requests going to the same keep alive connection