Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
20.00% covered (danger)
20.00%
2 / 10
CRAP
29.79% covered (danger)
29.79%
28 / 94
TrackerPullProcess
0.00% covered (danger)
0.00%
0 / 1
20.00% covered (danger)
20.00%
2 / 10
104.61
29.79% covered (danger)
29.79%
28 / 94
 __construct
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
10 / 10
 handle
0.00% covered (danger)
0.00%
0 / 1
10.31
26.67% covered (danger)
26.67%
8 / 30
 normalizeConnectionConfig
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
8 / 8
 setMongoConfig
0.00% covered (danger)
0.00%
0 / 1
1.04
66.67% covered (warning)
66.67%
2 / 3
 getData
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 9
 infoHeader
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 6
 insert
0.00% covered (danger)
0.00%
0 / 1
6
0.00% covered (danger)
0.00%
0 / 13
 delete
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 7
 getMinMax
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 3
 getQuery
0.00% covered (danger)
0.00%
0 / 1
12
0.00% covered (danger)
0.00%
0 / 5
1<?php
2
3namespace Qmp\Laravel\Collect\Console\Commands;
4
5use Exception;
6use MongoDB\BSON\ObjectId;
7use Illuminate\Support\{Collection, Str};
8use Illuminate\Support\Facades\{DB, Log, Redis};
9use Qmp\Laravel\CommandsLaravel\Middleware\Library\Monitoring\{AddSubProcess, UpdateSubProcess};
10use Qmp\Laravel\DBConnector\Config;
11use Qmp\Laravel\DBConnector\Connectors\MongoConnector;
12use Qmp\Laravel\MicroService\Client\Client;
13use Qmp\Laravel\MicroService\Client\Tools\Request as ClientRequest;
14use Qmp\Laravel\AsyncProcessFactory\Traits\ProcessFactoryStatusModel;
15use Qmp\Laravel\ToolsLaravel\Traits\Timer;
16
17class TrackerPullProcess extends \Commando
18{
19    use Timer, ProcessFactoryStatusModel;
20
21    /**
22     * the local mongo connection
23     *
24     * @var string
25     */
26    const CONNECTION = 'collectMongo';
27
28    /**
29     *  The distant collection name, used as prefix in local
30     *
31     * @var string
32     */
33    const COLLECTION = 'data_collector';
34
35    /**
36     * The name and signature of the console command.
37     *
38     * @var string
39     */
40    protected $signature = 'collect:tracker-pull-process {id} {parentId}';
41
42    /**
43     * The console command description.
44     *
45     * @var string
46     */
47    protected $description = 'Pull all data from trackers';
48
49    /**
50     * The mongo connector used on tracker
51     *
52     * @var MongoConnector
53     */
54    protected $mongo;
55
56    /**
57     * Undocumented variable
58     *
59     * @var [type]
60     */
61    protected $redis;
62
63    /**
64     * The distant database
65     *
66     * @var string
67     */
68    protected $database = 'tracker';
69
70    /**
71     * Size if the temporary collection
72     *
73     * @var integer
74     */
75    protected $chunkSize = 10000;
76
77    /**
78     * Limit volume to pull on tracker
79     *
80     * @var integer
81     */
82    protected $limit = 100000;
83
84    /**
85     * The insert message
86     *
87     * @var string
88     */
89    protected $messageInsert = "<info>Inserting %s entries from chunk #%s ... </info> ";
90
91    /**
92     * The delete message
93     *
94     * @var string
95     */
96    protected $messageDelete = "<info>Deleting chunk #%s on tracker ... </info>";
97
98    /**
99     * Undocumented variable
100     *
101     * @var [type]
102     */
103    protected $middlewareDataUpdate;
104
105    /**
106     * Class Constructor
107     */
108    public function __construct()
109    {
110        parent::__construct();
111
112        $this->mongo = new MongoConnector();
113        $this->mongo->collection(self::COLLECTION);
114        $this->redis = Redis::connection('tracker');
115        $this->middlewareDataUpdate = $this->middlewareData(UpdateSubProcess::class)->get();
116        $this->middlewareDataUpdate->log['collections'] = [];
117
118        $middlewares = [
119            AddSubProcess::class,
120            UpdateSubProcess::class
121        ];
122
123        $this->middleware($middlewares);
124        $this->middlewareData($middlewares)->createId();
125    }
126
127    /**
128     * Execute the console command.
129     *
130     * @return void
131     * @throws Exception
132     */
133    public function handle(): void
134    {
135        $this->startTimer('total');
136
137        $connection = $this->normalizeConnectionConfig($this->argument('id'));
138        $trackerName = $connection->get('name');
139
140        try {
141
142            $this->setMongoConfig($connection);
143            $this->infoHeader($trackerName);
144            $data = $this->getData($trackerName);
145
146            if ($data->count()) {
147                $this->redis->set($trackerName, $this->getMinMax($data)->toJson());
148
149                $this->startTimer('insert');
150
151                $chunks = $data->chunk($this->chunkSize);
152                $chunks->each(function ($chunk, $key) {
153                    try {
154                        $this->insert($chunk, $key)
155                            ->delete($chunk, $key);
156                    } catch (\Exception $e) {
157                        Log::debug(var_export($e->getMessage(), true));
158                    }
159                    $this->error($this->getTimer('insert', true));
160                    $this->line('');
161                });
162
163                $this->redis->del($trackerName);
164            } else {
165                $this->info('Up to date !');
166                $this->line('');
167            }
168
169            // Pass data to middleware
170            $this->middlewareDataUpdate->log = array_merge($this->middlewareDataUpdate->log, [
171                'count' => $data->count() . ' ' . Str::plural('entry', $data->count()),
172                'duration' => $this->getTimer('total'),
173                'tracker' => $connection->get('name'),
174            ]);
175
176            $this->error('Total Time : ' .  $this->middlewareDataUpdate->log['duration']);
177        } catch (Exception $e) {
178            $this->line('Error: ' . $e->getMessage());
179            $this->middlewareDataUpdate->error = $this->reportException($e);
180        }
181    }
182
183    /**
184     * Undocumented function
185     *
186     * @param string $connection
187     * @return Collection
188     */
189    protected function normalizeConnectionConfig($id)
190    {
191        $request = ClientRequest::createObject('service_tracker', "config/" . $id);
192        $clientResponse = Client::systemSend('get', $request);
193
194        $tempCollection = collect($clientResponse->content);
195
196        return collect([
197            'host' => $tempCollection->get('ip'),
198            'name' => $tempCollection->get('name'),
199            'database' => $this->database
200        ])->merge($tempCollection->get('mongo'));
201    }
202
203    /**
204     * Undocumented function
205     *
206     * @param Collection $config
207     * @return void
208     */
209    protected function setMongoConfig(Collection $config)
210    {
211        $config =  new Config($config->except('name')->toArray());
212        $this->mongo->connection($config);
213    }
214
215    /**
216     * Fetch all datas from the current tracker
217     * 
218     * @paral string $trackerName
219     * @return Collection
220     */
221    protected function getData($trackerName): Collection
222    {
223        $this->output->write('<info>Get entries from tracker ...</info>', false);
224        $this->startTimer('get');
225
226        $datas = $this->mongo->all($this->getQuery($trackerName), [
227            'sort' => ['_id' => 1],
228            'limit' => $this->limit
229        ]);
230
231        $this->output->write(' ' . $datas->count() . ' entries found', true);
232        $this->error($this->getTimer('get', true));
233        $this->line('');
234        return $datas;
235    }
236
237    /**
238     * The info displayed when the process start
239     *
240     * @param string $name
241     * @return void
242     */
243    protected function infoHeader(string $name): void
244    {
245        $message = "Connecting to $name";
246
247        $this->line(str_repeat('-', strlen($message)));
248        $this->line($message);
249        $this->line(str_repeat('-', strlen($message)));
250        $this->line('');
251    }
252
253    /**
254     * The local insert
255     *
256     * @param Collection $chunk
257     * @param int $key
258     * @return self
259     * @throws Exception
260     */
261    protected function insert(Collection $chunk, int $key): self
262    {
263        $this->createProcessStatus('collect:data-distributor');
264
265        $this->output->write(sprintf($this->messageInsert, $chunk->count(), $key), false);
266
267        $collection = self::COLLECTION . '_' . $this->id;
268
269        $inserted = DB::connection(self::CONNECTION)
270            ->collection($collection)
271            ->insert($chunk->values()->toArray());
272
273        if (!$inserted) {
274            $this->middlewareDataUpdate->error = ['message' => 'Unable to insert local data : ' . $this->id];
275            throw new Exception('Unable to insert local data : ' . $this->id);
276        }
277
278        $this->middlewareDataUpdate->log['collections'][] = ['collection' => $collection, 'entries' => $chunk->count()];
279
280        $this->output->write('done', false);
281
282        $this->activeProcessStatus();
283
284        return $this;
285    }
286
287    /**
288     * The remote delete
289     *
290     * @param Collection $chunk
291     * @param int $key
292     * @return void
293     */
294    protected function delete(Collection $chunk, int $key): void
295    {
296        $this->info('');
297        $repeat = strlen(sprintf($this->messageInsert, $chunk->count(), $key)) - strlen(sprintf($this->messageDelete, $key));
298        $this->output->write(sprintf($this->messageDelete, $key), false);
299
300        //$query = [ '_id'=> [ '$in' => $chunk->pluck('_id')->toArray()]];
301        $query = $this->getQuery(null, $this->getMinMax($chunk)->toArray());
302        $del = $this->mongo->deleteMany($query);
303
304        $this->output->write(str_repeat(" ", $repeat) . 'done (' . $del->getDeletedCount() . ')', true);
305    }
306
307    /**
308     * Undocumented function
309     *
310     * @param [type] $data
311     * @return Collection
312     */
313    protected function getMinMax(Collection $data)
314    {
315        return collect([
316            'min' => $data->first()->mongoId(true),
317            'max' => $data->last()->mongoId(true),
318        ]);
319    }
320
321    /**
322     * Undocumented function
323     *
324     * @param mixed $collection
325     * @return Array
326     */
327    protected function getQuery($trackerName, array $array = null)
328    {
329        $last = $array ? $array : json_decode($this->redis->get($trackerName), true);
330
331        if ($last) {
332            return [
333                '_id' => [
334                    '$gte' => new ObjectId($last['min']),
335                    '$lte' => new ObjectId($last['max']),
336                ]
337            ];
338        }
339
340        return [];
341    }
342}