Code Coverage |
||||||||||
Classes and Traits |
Functions and Methods |
Lines |
||||||||
| Total | |
0.00% |
0 / 1 |
|
20.00% |
2 / 10 |
CRAP | |
29.79% |
28 / 94 |
| TrackerPullProcess | |
0.00% |
0 / 1 |
|
20.00% |
2 / 10 |
104.61 | |
29.79% |
28 / 94 |
| __construct | |
100.00% |
1 / 1 |
1 | |
100.00% |
10 / 10 |
|||
| handle | |
0.00% |
0 / 1 |
10.31 | |
26.67% |
8 / 30 |
|||
| normalizeConnectionConfig | |
100.00% |
1 / 1 |
1 | |
100.00% |
8 / 8 |
|||
| setMongoConfig | |
0.00% |
0 / 1 |
1.04 | |
66.67% |
2 / 3 |
|||
| getData | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 9 |
|||
| infoHeader | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 6 |
|||
| insert | |
0.00% |
0 / 1 |
6 | |
0.00% |
0 / 13 |
|||
| delete | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 7 |
|||
| getMinMax | |
0.00% |
0 / 1 |
2 | |
0.00% |
0 / 3 |
|||
| getQuery | |
0.00% |
0 / 1 |
12 | |
0.00% |
0 / 5 |
|||
| 1 | <?php |
| 2 | |
| 3 | namespace Qmp\Laravel\Collect\Console\Commands; |
| 4 | |
| 5 | use Exception; |
| 6 | use MongoDB\BSON\ObjectId; |
| 7 | use Illuminate\Support\{Collection, Str}; |
| 8 | use Illuminate\Support\Facades\{DB, Log, Redis}; |
| 9 | use Qmp\Laravel\CommandsLaravel\Middleware\Library\Monitoring\{AddSubProcess, UpdateSubProcess}; |
| 10 | use Qmp\Laravel\DBConnector\Config; |
| 11 | use Qmp\Laravel\DBConnector\Connectors\MongoConnector; |
| 12 | use Qmp\Laravel\MicroService\Client\Client; |
| 13 | use Qmp\Laravel\MicroService\Client\Tools\Request as ClientRequest; |
| 14 | use Qmp\Laravel\AsyncProcessFactory\Traits\ProcessFactoryStatusModel; |
| 15 | use Qmp\Laravel\ToolsLaravel\Traits\Timer; |
| 16 | |
| 17 | class TrackerPullProcess extends \Commando |
| 18 | { |
| 19 | use Timer, ProcessFactoryStatusModel; |
| 20 | |
| 21 | /** |
| 22 | * the local mongo connection |
| 23 | * |
| 24 | * @var string |
| 25 | */ |
| 26 | const CONNECTION = 'collectMongo'; |
| 27 | |
| 28 | /** |
| 29 | * The distant collection name, used as prefix in local |
| 30 | * |
| 31 | * @var string |
| 32 | */ |
| 33 | const COLLECTION = 'data_collector'; |
| 34 | |
| 35 | /** |
| 36 | * The name and signature of the console command. |
| 37 | * |
| 38 | * @var string |
| 39 | */ |
| 40 | protected $signature = 'collect:tracker-pull-process {id} {parentId}'; |
| 41 | |
| 42 | /** |
| 43 | * The console command description. |
| 44 | * |
| 45 | * @var string |
| 46 | */ |
| 47 | protected $description = 'Pull all data from trackers'; |
| 48 | |
| 49 | /** |
| 50 | * The mongo connector used on tracker |
| 51 | * |
| 52 | * @var MongoConnector |
| 53 | */ |
| 54 | protected $mongo; |
| 55 | |
| 56 | /** |
| 57 | * Undocumented variable |
| 58 | * |
| 59 | * @var [type] |
| 60 | */ |
| 61 | protected $redis; |
| 62 | |
| 63 | /** |
| 64 | * The distant database |
| 65 | * |
| 66 | * @var string |
| 67 | */ |
| 68 | protected $database = 'tracker'; |
| 69 | |
| 70 | /** |
| 71 | * Size if the temporary collection |
| 72 | * |
| 73 | * @var integer |
| 74 | */ |
| 75 | protected $chunkSize = 10000; |
| 76 | |
| 77 | /** |
| 78 | * Limit volume to pull on tracker |
| 79 | * |
| 80 | * @var integer |
| 81 | */ |
| 82 | protected $limit = 100000; |
| 83 | |
| 84 | /** |
| 85 | * The insert message |
| 86 | * |
| 87 | * @var string |
| 88 | */ |
| 89 | protected $messageInsert = "<info>Inserting %s entries from chunk #%s ... </info> "; |
| 90 | |
| 91 | /** |
| 92 | * The delete message |
| 93 | * |
| 94 | * @var string |
| 95 | */ |
| 96 | protected $messageDelete = "<info>Deleting chunk #%s on tracker ... </info>"; |
| 97 | |
| 98 | /** |
| 99 | * Undocumented variable |
| 100 | * |
| 101 | * @var [type] |
| 102 | */ |
| 103 | protected $middlewareDataUpdate; |
| 104 | |
| 105 | /** |
| 106 | * Class Constructor |
| 107 | */ |
| 108 | public function __construct() |
| 109 | { |
| 110 | parent::__construct(); |
| 111 | |
| 112 | $this->mongo = new MongoConnector(); |
| 113 | $this->mongo->collection(self::COLLECTION); |
| 114 | $this->redis = Redis::connection('tracker'); |
| 115 | $this->middlewareDataUpdate = $this->middlewareData(UpdateSubProcess::class)->get(); |
| 116 | $this->middlewareDataUpdate->log['collections'] = []; |
| 117 | |
| 118 | $middlewares = [ |
| 119 | AddSubProcess::class, |
| 120 | UpdateSubProcess::class |
| 121 | ]; |
| 122 | |
| 123 | $this->middleware($middlewares); |
| 124 | $this->middlewareData($middlewares)->createId(); |
| 125 | } |
| 126 | |
| 127 | /** |
| 128 | * Execute the console command. |
| 129 | * |
| 130 | * @return void |
| 131 | * @throws Exception |
| 132 | */ |
| 133 | public function handle(): void |
| 134 | { |
| 135 | $this->startTimer('total'); |
| 136 | |
| 137 | $connection = $this->normalizeConnectionConfig($this->argument('id')); |
| 138 | $trackerName = $connection->get('name'); |
| 139 | |
| 140 | try { |
| 141 | |
| 142 | $this->setMongoConfig($connection); |
| 143 | $this->infoHeader($trackerName); |
| 144 | $data = $this->getData($trackerName); |
| 145 | |
| 146 | if ($data->count()) { |
| 147 | $this->redis->set($trackerName, $this->getMinMax($data)->toJson()); |
| 148 | |
| 149 | $this->startTimer('insert'); |
| 150 | |
| 151 | $chunks = $data->chunk($this->chunkSize); |
| 152 | $chunks->each(function ($chunk, $key) { |
| 153 | try { |
| 154 | $this->insert($chunk, $key) |
| 155 | ->delete($chunk, $key); |
| 156 | } catch (\Exception $e) { |
| 157 | Log::debug(var_export($e->getMessage(), true)); |
| 158 | } |
| 159 | $this->error($this->getTimer('insert', true)); |
| 160 | $this->line(''); |
| 161 | }); |
| 162 | |
| 163 | $this->redis->del($trackerName); |
| 164 | } else { |
| 165 | $this->info('Up to date !'); |
| 166 | $this->line(''); |
| 167 | } |
| 168 | |
| 169 | // Pass data to middleware |
| 170 | $this->middlewareDataUpdate->log = array_merge($this->middlewareDataUpdate->log, [ |
| 171 | 'count' => $data->count() . ' ' . Str::plural('entry', $data->count()), |
| 172 | 'duration' => $this->getTimer('total'), |
| 173 | 'tracker' => $connection->get('name'), |
| 174 | ]); |
| 175 | |
| 176 | $this->error('Total Time : ' . $this->middlewareDataUpdate->log['duration']); |
| 177 | } catch (Exception $e) { |
| 178 | $this->line('Error: ' . $e->getMessage()); |
| 179 | $this->middlewareDataUpdate->error = $this->reportException($e); |
| 180 | } |
| 181 | } |
| 182 | |
| 183 | /** |
| 184 | * Undocumented function |
| 185 | * |
| 186 | * @param string $connection |
| 187 | * @return Collection |
| 188 | */ |
| 189 | protected function normalizeConnectionConfig($id) |
| 190 | { |
| 191 | $request = ClientRequest::createObject('service_tracker', "config/" . $id); |
| 192 | $clientResponse = Client::systemSend('get', $request); |
| 193 | |
| 194 | $tempCollection = collect($clientResponse->content); |
| 195 | |
| 196 | return collect([ |
| 197 | 'host' => $tempCollection->get('ip'), |
| 198 | 'name' => $tempCollection->get('name'), |
| 199 | 'database' => $this->database |
| 200 | ])->merge($tempCollection->get('mongo')); |
| 201 | } |
| 202 | |
| 203 | /** |
| 204 | * Undocumented function |
| 205 | * |
| 206 | * @param Collection $config |
| 207 | * @return void |
| 208 | */ |
| 209 | protected function setMongoConfig(Collection $config) |
| 210 | { |
| 211 | $config = new Config($config->except('name')->toArray()); |
| 212 | $this->mongo->connection($config); |
| 213 | } |
| 214 | |
| 215 | /** |
| 216 | * Fetch all datas from the current tracker |
| 217 | * |
| 218 | * @paral string $trackerName |
| 219 | * @return Collection |
| 220 | */ |
| 221 | protected function getData($trackerName): Collection |
| 222 | { |
| 223 | $this->output->write('<info>Get entries from tracker ...</info>', false); |
| 224 | $this->startTimer('get'); |
| 225 | |
| 226 | $datas = $this->mongo->all($this->getQuery($trackerName), [ |
| 227 | 'sort' => ['_id' => 1], |
| 228 | 'limit' => $this->limit |
| 229 | ]); |
| 230 | |
| 231 | $this->output->write(' ' . $datas->count() . ' entries found', true); |
| 232 | $this->error($this->getTimer('get', true)); |
| 233 | $this->line(''); |
| 234 | return $datas; |
| 235 | } |
| 236 | |
| 237 | /** |
| 238 | * The info displayed when the process start |
| 239 | * |
| 240 | * @param string $name |
| 241 | * @return void |
| 242 | */ |
| 243 | protected function infoHeader(string $name): void |
| 244 | { |
| 245 | $message = "Connecting to $name"; |
| 246 | |
| 247 | $this->line(str_repeat('-', strlen($message))); |
| 248 | $this->line($message); |
| 249 | $this->line(str_repeat('-', strlen($message))); |
| 250 | $this->line(''); |
| 251 | } |
| 252 | |
| 253 | /** |
| 254 | * The local insert |
| 255 | * |
| 256 | * @param Collection $chunk |
| 257 | * @param int $key |
| 258 | * @return self |
| 259 | * @throws Exception |
| 260 | */ |
| 261 | protected function insert(Collection $chunk, int $key): self |
| 262 | { |
| 263 | $this->createProcessStatus('collect:data-distributor'); |
| 264 | |
| 265 | $this->output->write(sprintf($this->messageInsert, $chunk->count(), $key), false); |
| 266 | |
| 267 | $collection = self::COLLECTION . '_' . $this->id; |
| 268 | |
| 269 | $inserted = DB::connection(self::CONNECTION) |
| 270 | ->collection($collection) |
| 271 | ->insert($chunk->values()->toArray()); |
| 272 | |
| 273 | if (!$inserted) { |
| 274 | $this->middlewareDataUpdate->error = ['message' => 'Unable to insert local data : ' . $this->id]; |
| 275 | throw new Exception('Unable to insert local data : ' . $this->id); |
| 276 | } |
| 277 | |
| 278 | $this->middlewareDataUpdate->log['collections'][] = ['collection' => $collection, 'entries' => $chunk->count()]; |
| 279 | |
| 280 | $this->output->write('done', false); |
| 281 | |
| 282 | $this->activeProcessStatus(); |
| 283 | |
| 284 | return $this; |
| 285 | } |
| 286 | |
| 287 | /** |
| 288 | * The remote delete |
| 289 | * |
| 290 | * @param Collection $chunk |
| 291 | * @param int $key |
| 292 | * @return void |
| 293 | */ |
| 294 | protected function delete(Collection $chunk, int $key): void |
| 295 | { |
| 296 | $this->info(''); |
| 297 | $repeat = strlen(sprintf($this->messageInsert, $chunk->count(), $key)) - strlen(sprintf($this->messageDelete, $key)); |
| 298 | $this->output->write(sprintf($this->messageDelete, $key), false); |
| 299 | |
| 300 | //$query = [ '_id'=> [ '$in' => $chunk->pluck('_id')->toArray()]]; |
| 301 | $query = $this->getQuery(null, $this->getMinMax($chunk)->toArray()); |
| 302 | $del = $this->mongo->deleteMany($query); |
| 303 | |
| 304 | $this->output->write(str_repeat(" ", $repeat) . 'done (' . $del->getDeletedCount() . ')', true); |
| 305 | } |
| 306 | |
| 307 | /** |
| 308 | * Undocumented function |
| 309 | * |
| 310 | * @param [type] $data |
| 311 | * @return Collection |
| 312 | */ |
| 313 | protected function getMinMax(Collection $data) |
| 314 | { |
| 315 | return collect([ |
| 316 | 'min' => $data->first()->mongoId(true), |
| 317 | 'max' => $data->last()->mongoId(true), |
| 318 | ]); |
| 319 | } |
| 320 | |
| 321 | /** |
| 322 | * Undocumented function |
| 323 | * |
| 324 | * @param mixed $collection |
| 325 | * @return Array |
| 326 | */ |
| 327 | protected function getQuery($trackerName, array $array = null) |
| 328 | { |
| 329 | $last = $array ? $array : json_decode($this->redis->get($trackerName), true); |
| 330 | |
| 331 | if ($last) { |
| 332 | return [ |
| 333 | '_id' => [ |
| 334 | '$gte' => new ObjectId($last['min']), |
| 335 | '$lte' => new ObjectId($last['max']), |
| 336 | ] |
| 337 | ]; |
| 338 | } |
| 339 | |
| 340 | return []; |
| 341 | } |
| 342 | } |