#celery

/

      • homeshlice joined the channel
      • il joined the channel
      • domino14 joined the channel
      • Audioburn has quit
      • domino14 has quit
      • transit has quit
      • il is now known as Audioburn
      • gnoze5 joined the channel
      • transit joined the channel
      • Debnet has quit
      • homeshlice joined the channel
      • frodopwns joined the channel
      • ultimateboy joined the channel
      • bswanson joined the channel
      • maryokhin has quit
      • transit has quit
      • gnoze5 has quit
      • maryokhin joined the channel
      • Vitor joined the channel
      • domino14 joined the channel
      • transit joined the channel
      • mher has quit
      • mher joined the channel
      • Audioburn is now known as Xo
      • transit has left the channel
      • frodopwns has quit
      • foist
        How do I pass an argument through `group()` to each task in that group? For instance, if I want all tasks to receive `foo=‘bar’`.
      • bswanson has quit
      • domino14 joined the channel
      • asksol_
        I know args can be passed in 4.0 (master): g.apply_async(foo='bar'), if it doesn't work in 3.1 then the only solution is to do for task in g.tasks: g.kwargs['foo'] = 'bar'
      • foist,
      • foist
        asksol_: thanks, I will try
      • ninkotech joined the channel
      • asksol_: my line looks like this my_method.chunks(ids, chunks_per_group).group()`
      • How would I pass it in there?
      • domino14 has quit
      • asksol_
        .group() returns a group, so g = my_method.chunks(ids, chunks_per_group).group()
      • foist
        asksol_: does it return a single group, or a list of groups?
      • asksol_
        oh, it's a single group of map tasks
      • starmap to be exact, so I guess it's more difficult for chunks
      • foist
        asksol_: so what’s wrong with this? http://dpaste.com/25EXHWM
      • E TypeError: xstarmap() got an unexpected keyword argument 'source_name'
      • asksol_
        starmap doesn't take any keyword arguments, it just takes a list of arguments
      • foist
        def my_method(self, source_name, *ids_list):
      • I’m just trying to figure out how to pass `source_name` in.
      • asksol_
        starmap(add.s(), [(2, 2), (4, 4), (5, 5)]) -> [add(2, 2), add(4, 4), add(5, 5)]
      • you would need to pass it in as a positional argument
      • foist
        And where would I do that within my code?
      • asksol_
        It's not "pure", but I guess it wouldn't hurt for starmap/map to support keyword arguments, but too late for 3.1 anyway
      • if you have a task x(a, foo=None), then x.chunks([(arg, foo) for arg in arguments])
      • x.chunks(((arg, foo) for arg in arguments), 10) is lazy
      • foist
        The many levels of nesting confuse the hell out of me.
      • mher has quit
      • asksol_: thanks, it worked!
      • Xo has quit
      • asksol_: could you explain to me what the point of chunks is? I am not sure I get it. Why split the work into multiple piece when they’re all going to be iterated over anyways? Is it so each piece can be sent to a different worker, possibly one that is on another server?
      • Audioburn joined the channel
      • frodopwns joined the channel
      • cliles joined the channel
      • asksol_
        no, chunks is for when you have 1000 items, but instead of using 1000 tasks to process them you want to have 10 tasks processing 100 items each
      • maryokhin has quit
      • Vitor has quit
      • cjhubert has quit
      • foist
        asksol_: again, what’s the point? You’re still going to go through 1000 items
      • Vitor joined the channel
      • asksol_
        messaging overhead, cpu caches, and data locality, etc
      • domino14 joined the channel
      • foist
        asksol_: Sorry, I don’t get it yet. A task is a callable that has been turned into a class. Messages are put in the queue which are retrieved and acknowledged by workers, thereby removing them from the queue. A message contains the necessary information for invoking the callable class, like args to be passed in, as well as some metadata about the
      • execution, like the number of retries. When a worker gets the message, the call to the task is “built” and issued; ie. the task is called.
      • domino14 has quit
      • asksol_: A message is put in the queue for each time the callable needs to be called. If there are 1000 items, 1000 different messages are put into the queue, resulting in 1000 tasks. …I feel like this is where my understanding breaks down. Or have I been wrong the whole time?
      • Vitor has quit
      • frodopwns has quit
      • asksol_
        if you do group(add.s(i, i) for i in range(1000)).delay() there are 1000 messages, if you do add.chunks((i, i) for i in range(1000)), 100) there will be 10 tasks.
      • there's no real task context in this case, so retries will not work
      • foist
        asksol_: okay, retries are irrelevant. I don’t get it though. add() will still have to be executed 1000 times. What’s the difference?
      • I’m not trying to be dense. I’ve read much of the docs and have even used Celery with some success in the past, but I don’t get the benefits of chunking. I don’t see where there are savings, I guess.
      • asksol_
        depends on what add() is doing, you know that the items will be processed by the same process and things may be in cache
      • messaging also adds overhead, so in the add numbers case doing 1000 additions in every task will be a whole lot faster.
      • 100 even
      • if you have 10000 tasks adding numbers it's from 2 s down to 0.001
      • foist
        So one task can result in a number of calls with different arguments to the same method?
      • asksol_
        if you have a task sending 100k urls to fetch, you can sort the chunks so that domains will be in the DNS cache
      • yeah, it's like [map(task, chunked_args) for chunked_args in chunks(args, 100)]
      • where map is [task(args) for args in chunked_args] and starmap is [task(*args) for args in chunked_args]
      • and of course, task(*args) calls the function inline, as opposed to task.delay(*args) :)
      • foist
        Thanks, asksol_. I have a lot to learn.
      • asksol_
        lucky for us, that never ends
      • foist
        Is there a good intro to Celery video or book you know of?
      • Something that covers all bases a bit?
      • mher joined the channel
      • asksol_
        I'm not aware of anything like that, but this is more of like a special case. You don't need it unless you have a reason to optimize something
      • foist
        asksol_: I’m trying to build a syncing system (data between two servers, inconsistently fomatted) and Celery is involved. I’ve used chunks because I assumed it would help me optimize request speed and volume once I gather some performance data and see how things are running.
      • asksol_
        I wish there was a good resource on distributed event systems, actors, task granularity (which is part of current topic), and all the stuff that celery builds on, but there's very little so hard to point to
      • you can look up 'task granularity' to get some information about how tasks should be composed in parallel and distributed environments
      • foist
        In my particular case, server x is receiving a list of ints, and then it sends a GET request to server y, using the ints list as the query param. I’m chunking the list of ints.
      • So that I can control how many are put into a single GET request
      • asksol_
        you can always benchmark it, sounds like it could be beneficial
      • especially if there are thousands of ints
      • foist
        Could be many more.
      • asksol_
        since there's I/O you may get better parallelization if you use one task per request (then again, if there's only 10 child processes (-c 10) in total you would be maxed out already)
      • foist
        Alright. I’m going to sleep now. I’ll surely be back with more troubles sometime soon. Thanks again, asksol_.
      • asksol_
        if you 1000 different urls, then you may want to optimize on requests going to the same keep alive connection
      • if you have*
      • good night :)
      • mher has quit
      • mher joined the channel
      • mher has quit
      • malinoff joined the channel
      • garic88 joined the channel
      • mher joined the channel
      • Debnet joined the channel
      • il joined the channel
      • Audioburn has quit
      • ti88 joined the channel
      • mher has quit
      • Contigi777 joined the channel
      • aimatt_ joined the channel
      • foist_ joined the channel
      • jumpinggrendel has quit
      • ninkotech_ joined the channel
      • pingviin1 joined the channel
      • adm joined the channel
      • DctrWatson has quit
      • aimatt has quit
      • pylot__ has quit
      • ardi_ joined the channel
      • ir2ivps7_ joined the channel
      • Contigi joined the channel
      • apocolipse_ joined the channel
      • janelleb joined the channel
      • rvause_ joined the channel
      • natefoo_ joined the channel
      • domino14 joined the channel
      • mrud_ joined the channel
      • jwpeddle_ joined the channel
      • edulix_ joined the channel
      • Callek_ joined the channel
      • keisetsu_ joined the channel
      • alexlord_ joined the channel
      • Gentle` joined the channel
      • hack72 joined the channel
      • robhudson_ joined the channel
      • darvon_ joined the channel
      • pylot__ joined the channel
      • brodul_ joined the channel
      • egon1024_ joined the channel
      • ti88 has quit
      • kline has quit
      • pingviini has quit